Skip to content

Commit

Permalink
[exporter/awsemfexporter] add exponential histogram support (open-tel…
Browse files Browse the repository at this point in the history
…emetry#22626)

**Description:**

This PR adds [exponential
histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram)
support in `awsemfexporter`. The exponential histogram metrics are
exported in Embedded Metric Format (EMF) log. The Count, Sum, Max and
Min are set as Statistical Set. The mid-point values and counts of
exponential histogram buckets are translated into Values/Counts array of
EMF log entry as well.

**Testing:**

The unit test is added and covers positive, zero and negative values. 

The integration test is performed with following OTEL collector
configuration.
```
extensions:
  health_check:
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch/metrics:
    timeout: 60s

exporters:
  logging:
    verbosity: detailed
  awsemf:
    region: 'us-east-1'
    namespace: "Test"
    dimension_rollup_option: "NoDimensionRollup"

service:
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [batch/metrics]
      exporters: [awsemf, logging]
  extensions: [health_check]
  telemetry:
    logs:
      level: "debug"
```
It generated EMF log for histogram metrics in following JSON format.
Notes: It doesn't cover negative values since histograms can [only
record non-negative
values](https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram)
and will [drop negative
values](https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleHistogram.java#L38C7-L44).

```
    "latency": {
        "Values": [
            309.4277237034415,
            323.12725941969757,
            326.64588457862067,
            344.8221530867399,
            520.3933272846809,
            531.7884573308439,
            537.579253961712,
            543.4331082335607,
            549.3507067990806,
            555.3327437881196,
            561.3799208891041,
            567.4929474313465,
            720.1774681373079,
            0
        ],
        "Counts": [
            1,
            1,
            1,
            1,
            1,
            3,
            4,
            2,
            2,
            3,
            1,
            1,
            1,
            22
        ],
        "Max": 720,
        "Min": 0,
        "Count": 44,
        "Sum": 11265
    }
```
  • Loading branch information
vastin committed Jun 14, 2023
1 parent f66845f commit a4dda5b
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 0 deletions.
20 changes: 20 additions & 0 deletions .chloggen/awsemf-exponential-histogram.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awsemfexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add exponential histogram support.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [22626]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
96 changes: 96 additions & 0 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"fmt"
"math"
"strconv"
"time"

Expand Down Expand Up @@ -86,6 +87,13 @@ type histogramDataPointSlice struct {
pmetric.HistogramDataPointSlice
}

type exponentialHistogramDataPointSlice struct {
// TODO: Calculate delta value for count and sum value with exponential histogram
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
deltaMetricMetadata
pmetric.ExponentialHistogramDataPointSlice
}

// summaryDataPointSlice is a wrapper for pmetric.SummaryDataPointSlice
type summaryDataPointSlice struct {
deltaMetricMetadata
Expand Down Expand Up @@ -156,6 +164,88 @@ func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentati
}}, true
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool) ([]dataPoint, bool) {
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}
var bucketBegin float64
var bucketEnd float64

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
} else {
bucketBegin = bucketEnd
}
bucketEnd = math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
}
}

// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
}

// Set mid-point of negative buckets in values/counts array.
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.

negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < negativeBucketCounts.Len(); i++ {
index := i + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(i)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
}
}

return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

// CalculateDeltaDatapoints retrieves the SummaryDataPoint at the given index and perform calculation with sum and count while retain the quantile value.
func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, detailedMetrics bool) ([]dataPoint, bool) {
metric := dps.SummaryDataPointSlice.At(i)
Expand Down Expand Up @@ -263,6 +353,12 @@ func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Lo
metricMetadata,
metric.DataPoints(),
}
case pmetric.MetricTypeExponentialHistogram:
metric := pmd.ExponentialHistogram()
dps = exponentialHistogramDataPointSlice{
metricMetadata,
metric.DataPoints(),
}
case pmetric.MetricTypeSummary:
metric := pmd.Summary()
// For summaries coming from the prometheus receiver, the sum and count are cumulative, whereas for summaries
Expand Down
145 changes: 145 additions & 0 deletions exporter/awsemfexporter/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,34 @@ func generateTestHistogramMetric(name string) pmetric.Metrics {
return otelMetrics
}

func generateTestExponentialHistogramMetric(name string) pmetric.Metrics {
otelMetrics := pmetric.NewMetrics()
rs := otelMetrics.ResourceMetrics().AppendEmpty()
metrics := rs.ScopeMetrics().AppendEmpty().Metrics()
metric := metrics.AppendEmpty()
metric.SetName(name)
metric.SetUnit("Seconds")
exponentialHistogramMetric := metric.SetEmptyExponentialHistogram()

exponentialHistogramDatapoint := exponentialHistogramMetric.DataPoints().AppendEmpty()
exponentialHistogramDatapoint.SetCount(4)
exponentialHistogramDatapoint.SetSum(0)
exponentialHistogramDatapoint.SetMin(-4)
exponentialHistogramDatapoint.SetMax(4)
exponentialHistogramDatapoint.SetZeroCount(0)
exponentialHistogramDatapoint.SetScale(1)
exponentialHistogramDatapoint.Positive().SetOffset(1)
exponentialHistogramDatapoint.Positive().BucketCounts().FromRaw([]uint64{
1, 0, 1,
})
exponentialHistogramDatapoint.Negative().SetOffset(1)
exponentialHistogramDatapoint.Negative().BucketCounts().FromRaw([]uint64{
1, 0, 1,
})
exponentialHistogramDatapoint.Attributes().PutStr("label1", "value1")
return otelMetrics
}

func generateTestSummaryMetric(name string) pmetric.Metrics {
otelMetrics := pmetric.NewMetrics()
rs := otelMetrics.ResourceMetrics().AppendEmpty()
Expand Down Expand Up @@ -347,6 +375,106 @@ func TestCalculateDeltaDatapoints_HistogramDataPointSlice(t *testing.T) {

}

func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSlice(t *testing.T) {
deltaMetricMetadata := generateDeltaMetricMetadata(false, "foo", false)

testCases := []struct {
name string
histogramDPS pmetric.ExponentialHistogramDataPointSlice
expectedDatapoint dataPoint
}{
{
name: "Exponential histogram with min and max",
histogramDPS: func() pmetric.ExponentialHistogramDataPointSlice {
histogramDPS := pmetric.NewExponentialHistogramDataPointSlice()
histogramDP := histogramDPS.AppendEmpty()
histogramDP.SetCount(uint64(17))
histogramDP.SetSum(17.13)
histogramDP.SetMin(10)
histogramDP.SetMax(30)
histogramDP.Attributes().PutStr("label1", "value1")
return histogramDPS
}(),
expectedDatapoint: dataPoint{
name: "foo",
value: &cWMetricHistogram{Values: []float64{}, Counts: []float64{}, Sum: 17.13, Count: 17, Min: 10, Max: 30},
labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"},
},
},
{
name: "Exponential histogram without min and max",
histogramDPS: func() pmetric.ExponentialHistogramDataPointSlice {
histogramDPS := pmetric.NewExponentialHistogramDataPointSlice()
histogramDP := histogramDPS.AppendEmpty()
histogramDP.SetCount(uint64(17))
histogramDP.SetSum(17.13)
histogramDP.Attributes().PutStr("label1", "value1")
return histogramDPS

}(),
expectedDatapoint: dataPoint{
name: "foo",
value: &cWMetricHistogram{Values: []float64{}, Counts: []float64{}, Sum: 17.13, Count: 17, Min: 0, Max: 0},
labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"},
},
},
{
name: "Exponential histogram with buckets",
histogramDPS: func() pmetric.ExponentialHistogramDataPointSlice {
histogramDPS := pmetric.NewExponentialHistogramDataPointSlice()
histogramDP := histogramDPS.AppendEmpty()
histogramDP.Positive().BucketCounts().FromRaw([]uint64{1, 2, 3})
histogramDP.SetZeroCount(4)
histogramDP.Negative().BucketCounts().FromRaw([]uint64{1, 2, 3})
histogramDP.Attributes().PutStr("label1", "value1")
return histogramDPS
}(),
expectedDatapoint: dataPoint{
name: "foo",
value: &cWMetricHistogram{Values: []float64{1.5, 3, 6, 0, -1.5, -3, -6}, Counts: []float64{1, 2, 3, 4, 1, 2, 3}},
labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"},
},
},
{
name: "Exponential histogram with different scale/offset/labels",
histogramDPS: func() pmetric.ExponentialHistogramDataPointSlice {
histogramDPS := pmetric.NewExponentialHistogramDataPointSlice()
histogramDP := histogramDPS.AppendEmpty()
histogramDP.SetScale(-1)
histogramDP.Positive().SetOffset(-1)
histogramDP.Positive().BucketCounts().FromRaw([]uint64{1, 2, 3})
histogramDP.SetZeroCount(4)
histogramDP.Negative().SetOffset(-1)
histogramDP.Negative().BucketCounts().FromRaw([]uint64{1, 2, 3})
histogramDP.Attributes().PutStr("label1", "value1")
histogramDP.Attributes().PutStr("label2", "value2")
return histogramDPS
}(),
expectedDatapoint: dataPoint{
name: "foo",
value: &cWMetricHistogram{Values: []float64{0.625, 2.5, 10, 0, -0.625, -2.5, -10}, Counts: []float64{1, 2, 3, 4, 1, 2, 3}},
labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1", "label2": "value2"},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(_ *testing.T) {
// Given the histogram datapoints
exponentialHistogramDatapointSlice := exponentialHistogramDataPointSlice{deltaMetricMetadata, tc.histogramDPS}

// When calculate the delta datapoints for histograms
dps, retained := exponentialHistogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false)

// Then receiving the following datapoint with an expected length
assert.True(t, retained)
assert.Equal(t, 1, exponentialHistogramDatapointSlice.Len())
assert.Equal(t, tc.expectedDatapoint, dps[0])
})
}

}

func TestCalculateDeltaDatapoints_SummaryDataPointSlice(t *testing.T) {
for _, retainInitialValueOfDeltaMetric := range []bool{true, false} {
deltaMetricMetadata := generateDeltaMetricMetadata(true, "foo", retainInitialValueOfDeltaMetric)
Expand Down Expand Up @@ -486,6 +614,13 @@ func TestGetDataPoints(t *testing.T) {
expectedDatapointSlice: histogramDataPointSlice{cumulativeDeltaMetricMetadata, pmetric.HistogramDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "ExponentialHistogram",
isPrometheusMetrics: false,
metric: generateTestExponentialHistogramMetric("foo"),
expectedDatapointSlice: exponentialHistogramDataPointSlice{cumulativeDeltaMetricMetadata, pmetric.ExponentialHistogramDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Summary from SDK",
isPrometheusMetrics: false,
Expand Down Expand Up @@ -540,6 +675,15 @@ func TestGetDataPoints(t *testing.T) {
assert.Equal(t, uint64(18), dp.Count())
assert.Equal(t, []float64{0, 10}, dp.ExplicitBounds().AsRaw())
assert.Equal(t, tc.expectedAttributes, dp.Attributes().AsRaw())
case exponentialHistogramDataPointSlice:
assert.Equal(t, 1, convertedDPS.Len())
dp := convertedDPS.ExponentialHistogramDataPointSlice.At(0)
assert.Equal(t, float64(0), dp.Sum())
assert.Equal(t, uint64(4), dp.Count())
assert.Equal(t, []uint64{1, 0, 1}, dp.Positive().BucketCounts().AsRaw())
assert.Equal(t, []uint64{1, 0, 1}, dp.Negative().BucketCounts().AsRaw())
assert.Equal(t, uint64(0), dp.ZeroCount())
assert.Equal(t, tc.expectedAttributes, dp.Attributes().AsRaw())
case summaryDataPointSlice:
expectedDPS := tc.expectedDatapointSlice.(summaryDataPointSlice)
assert.Equal(t, expectedDPS.deltaMetricMetadata, convertedDPS.deltaMetricMetadata)
Expand Down Expand Up @@ -587,6 +731,7 @@ func BenchmarkGetAndCalculateDeltaDataPoints(b *testing.B) {
generateTestGaugeMetric("int-gauge", intValueType),
generateTestGaugeMetric("int-gauge", doubleValueType),
generateTestHistogramMetric("histogram"),
generateTestExponentialHistogramMetric("exponential-histogram"),
generateTestSumMetric("int-sum", intValueType),
generateTestSumMetric("double-sum", doubleValueType),
generateTestSummaryMetric("summary"),
Expand Down
11 changes: 11 additions & 0 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ type cWMetricStats struct {
Sum float64
}

// The SampleCount of CloudWatch metrics will be calculated by the sum of the 'Counts' array.
// The 'Count' field should be same as the sum of the 'Counts' array and will be ignored in CloudWatch.
type cWMetricHistogram struct {
Values []float64
Counts []float64
Max float64
Min float64
Count uint64
Sum float64
}

type groupedMetricMetadata struct {
namespace string
timestampMs int64
Expand Down

0 comments on commit a4dda5b

Please sign in to comment.