Skip to content

Commit

Permalink
Limit the number of exemplars to be added for each dimension set (#29242
Browse files Browse the repository at this point in the history
)

**Description:** Fix Memory usage for span metrics by limiting the
number of exemplars per unique dimensions set/metric key
  • Loading branch information
aishyandapalli committed Dec 12, 2023
1 parent f429e34 commit 5c2ef75
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 37 deletions.
27 changes: 27 additions & 0 deletions .chloggen/span-metrics-oom-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix OOM issue for spanmetrics by limiting the number of exemplars that can be added to a unique dimension set

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27451]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 2 additions & 1 deletion connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ type HistogramConfig struct {
}

type ExemplarsConfig struct {
Enabled bool `mapstructure:"enabled"`
Enabled bool `mapstructure:"enabled"`
MaxPerDataPoint *int `mapstructure:"max_per_data_point"`
}

type ExponentialHistogramConfig struct {
Expand Down
12 changes: 12 additions & 0 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)

defaultMethod := "GET"
defaultMaxPerDatapoint := 5
tests := []struct {
id component.ID
expected component.Config
Expand Down Expand Up @@ -98,6 +99,17 @@ func TestLoadConfig(t *testing.T) {
Exemplars: ExemplarsConfig{Enabled: true},
},
},
{
id: component.NewIDWithName(metadata.Type, "exemplars_enabled_with_max_per_datapoint"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
Exemplars: ExemplarsConfig{Enabled: true, MaxPerDataPoint: &defaultMaxPerDatapoint},
},
},
}

for _, tt := range tests {
Expand Down
8 changes: 4 additions & 4 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func initHistogramMetrics(cfg Config) metrics.HistogramMetrics {
if cfg.Histogram.Exponential.MaxSize != 0 {
maxSize = cfg.Histogram.Exponential.MaxSize
}
return metrics.NewExponentialHistogramMetrics(maxSize)
return metrics.NewExponentialHistogramMetrics(maxSize, cfg.Exemplars.MaxPerDataPoint)
}

var bounds []float64
Expand All @@ -156,7 +156,7 @@ func initHistogramMetrics(cfg Config) metrics.HistogramMetrics {
}
}

return metrics.NewExplicitHistogramMetrics(bounds)
return metrics.NewExplicitHistogramMetrics(bounds, cfg.Exemplars.MaxPerDataPoint)
}

// unitDivider returns a unit divider to convert nanoseconds to milliseconds or seconds.
Expand Down Expand Up @@ -396,8 +396,8 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet
if !ok {
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(),
events: metrics.NewSumMetrics(),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
attributes: attr,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
}
Expand Down
14 changes: 7 additions & 7 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
{
name: "initialize histogram with no config provided",
config: Config{},
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs),
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil),
},
{
name: "Disable histogram",
Expand All @@ -1247,7 +1247,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
Unit: metrics.Milliseconds,
},
},
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs),
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil),
},
{
name: "initialize explicit histogram with default bounds (seconds)",
Expand All @@ -1256,7 +1256,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
Unit: metrics.Seconds,
},
},
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsSeconds),
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsSeconds, nil),
},
{
name: "initialize explicit histogram with bounds (seconds)",
Expand All @@ -1271,7 +1271,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
},
},
},
want: metrics.NewExplicitHistogramMetrics([]float64{0.1, 1}),
want: metrics.NewExplicitHistogramMetrics([]float64{0.1, 1}, nil),
},
{
name: "initialize explicit histogram with bounds (ms)",
Expand All @@ -1286,7 +1286,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
},
},
},
want: metrics.NewExplicitHistogramMetrics([]float64{100, 1000}),
want: metrics.NewExplicitHistogramMetrics([]float64{100, 1000}, nil),
},
{
name: "initialize exponential histogram",
Expand All @@ -1298,7 +1298,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
},
},
},
want: metrics.NewExponentialHistogramMetrics(10),
want: metrics.NewExponentialHistogramMetrics(10, nil),
},
{
name: "initialize exponential histogram with default max buckets count",
Expand All @@ -1308,7 +1308,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
Exponential: &ExponentialHistogramConfig{},
},
},
want: metrics.NewExponentialHistogramMetrics(structure.DefaultMaxSize),
want: metrics.NewExponentialHistogramMetrics(structure.DefaultMaxSize, nil),
},
}
for _, tt := range tests {
Expand Down
76 changes: 51 additions & 25 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ type Histogram interface {
}

type explicitHistogramMetrics struct {
metrics map[Key]*explicitHistogram
bounds []float64
metrics map[Key]*explicitHistogram
bounds []float64
maxExemplarCount *int
}

type exponentialHistogramMetrics struct {
metrics map[Key]*exponentialHistogram
maxSize int32
metrics map[Key]*exponentialHistogram
maxSize int32
maxExemplarCount *int
}

type explicitHistogram struct {
Expand All @@ -44,37 +46,44 @@ type explicitHistogram struct {
sum float64

bounds []float64

maxExemplarCount *int
}

type exponentialHistogram struct {
attributes pcommon.Map
exemplars pmetric.ExemplarSlice

histogram *structure.Histogram[float64]

maxExemplarCount *int
}

func NewExponentialHistogramMetrics(maxSize int32) HistogramMetrics {
func NewExponentialHistogramMetrics(maxSize int32, maxExemplarCount *int) HistogramMetrics {
return &exponentialHistogramMetrics{
metrics: make(map[Key]*exponentialHistogram),
maxSize: maxSize,
metrics: make(map[Key]*exponentialHistogram),
maxSize: maxSize,
maxExemplarCount: maxExemplarCount,
}
}

func NewExplicitHistogramMetrics(bounds []float64) HistogramMetrics {
func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) HistogramMetrics {
return &explicitHistogramMetrics{
metrics: make(map[Key]*explicitHistogram),
bounds: bounds,
metrics: make(map[Key]*explicitHistogram),
bounds: bounds,
maxExemplarCount: maxExemplarCount,
}
}

func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
h, ok := m.metrics[key]
if !ok {
h = &explicitHistogram{
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
bounds: m.bounds,
bucketCounts: make([]uint64, len(m.bounds)+1),
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
bounds: m.bounds,
bucketCounts: make([]uint64, len(m.bounds)+1),
maxExemplarCount: m.maxExemplarCount,
}
m.metrics[key] = h
}
Expand Down Expand Up @@ -128,11 +137,13 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma
histogram.Init(cfg)

h = &exponentialHistogram{
histogram: histogram,
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
histogram: histogram,
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
}
m.metrics[key] = h

}

return h
Expand Down Expand Up @@ -212,6 +223,9 @@ func (h *explicitHistogram) Observe(value float64) {
}

func (h *explicitHistogram) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value float64) {
if h.maxExemplarCount != nil && h.exemplars.Len() >= *h.maxExemplarCount {
return
}
e := h.exemplars.AppendEmpty()
e.SetTraceID(traceID)
e.SetSpanID(spanID)
Expand All @@ -223,43 +237,55 @@ func (h *exponentialHistogram) Observe(value float64) {
}

func (h *exponentialHistogram) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value float64) {
if h.maxExemplarCount != nil && h.exemplars.Len() >= *h.maxExemplarCount {
return
}
e := h.exemplars.AppendEmpty()
e.SetTraceID(traceID)
e.SetSpanID(spanID)
e.SetDoubleValue(value)
}

type Sum struct {
attributes pcommon.Map
count uint64
exemplars pmetric.ExemplarSlice
attributes pcommon.Map
count uint64
exemplars pmetric.ExemplarSlice
maxExemplarCount *int
}

func (s *Sum) Add(value uint64) {
s.count += value
}

func NewSumMetrics() SumMetrics {
return SumMetrics{metrics: make(map[Key]*Sum)}
func NewSumMetrics(maxExemplarCount *int) SumMetrics {
return SumMetrics{
metrics: make(map[Key]*Sum),
maxExemplarCount: maxExemplarCount,
}
}

type SumMetrics struct {
metrics map[Key]*Sum
metrics map[Key]*Sum
maxExemplarCount *int
}

func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum {
s, ok := m.metrics[key]
if !ok {
s = &Sum{
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
}
m.metrics[key] = s
}
return s
}

func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value float64) {
if s.maxExemplarCount != nil && s.exemplars.Len() >= *s.maxExemplarCount {
return
}
e := s.exemplars.AppendEmpty()
e.SetTraceID(traceID)
e.SetSpanID(spanID)
Expand Down
Loading

0 comments on commit 5c2ef75

Please sign in to comment.