Skip to content

Commit

Permalink
[processor/metricstransform] Do not produce empty metrics (open-telem…
Browse files Browse the repository at this point in the history
…etry#12211)

* Do not produce empty metrics

Some transformations can leave metrics with no data points. Those metrics can be considered invalid and should not be generated. This change updates metricstransform processor to not produce empty metrics.

* add a comment
  • Loading branch information
dmitryax committed Jul 11, 2022
1 parent 5bd24f4 commit 100d245
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,16 @@ func (mtp *metricsTransformProcessor) processOTLPMetrics(md pmetric.Metrics) (pm
metrics := sm.Metrics()

for _, transform := range mtp.transforms {
if transform.Action == Group {
switch transform.Action {
case Group:
extractedMetrics := extractAndRemoveMatchedMetrics(transform.MetricIncludeFilter, metrics)
groupMatchedMetrics(rm.Resource(), sm.Scope(), extractedMetrics,
transform).CopyTo(groupedRMs.AppendEmpty())
}

matchedMetrics := matchMetrics(transform.MetricIncludeFilter, metrics)
if len(matchedMetrics) == 0 {
continue
}

if transform.Action == Combine {
case Combine:
matchedMetrics := matchMetrics(transform.MetricIncludeFilter, metrics)
if len(matchedMetrics) == 0 {
continue
}

if err := canBeCombined(matchedMetrics); err != nil {
// TODO: report via trace / metric instead
Expand All @@ -267,22 +265,37 @@ func (mtp *metricsTransformProcessor) processOTLPMetrics(md pmetric.Metrics) (pm
}

extractedMetrics := extractAndRemoveMatchedMetrics(transform.MetricIncludeFilter, metrics)
combinedMetric := metrics.AppendEmpty()
combine(transform, extractedMetrics).MoveTo(combinedMetric)

// set matchedMetrics to the combined metric so that any additional operations are performed on
// the combined metric
matchedMetrics = []pmetric.Metric{combinedMetric}
}

for _, metric := range matchedMetrics {
if transform.Action == Insert {
matchedMetric := transform.MetricIncludeFilter.extractMatchedMetric(metric)
newMetric := metrics.AppendEmpty()
matchedMetric.CopyTo(newMetric)
metric = newMetric
combinedMetric := combine(transform, extractedMetrics)
if transformMetric(combinedMetric, transform) {
combinedMetric.MoveTo(metrics.AppendEmpty())
}
case Insert:
newMetrics := pmetric.NewMetricSlice()
newMetrics.EnsureCapacity(metrics.Len())
for i := 0; i < metrics.Len(); i++ {
metric := metrics.At(i)
newMetric := transform.MetricIncludeFilter.extractMatchedMetric(metric)
if newMetric == (pmetric.Metric{}) {
continue
}
if newMetric == metric {
newMetric = pmetric.NewMetric()
metric.CopyTo(newMetric)
}
if transformMetric(newMetric, transform) {
newMetric.MoveTo(newMetrics.AppendEmpty())
}
}
transformMetric(metric, transform)
newMetrics.MoveAndAppendTo(metrics)
case Update:
metrics.RemoveIf(func(metric pmetric.Metric) bool {
if !transform.MetricIncludeFilter.matchMetric(metric) {
return false
}

// Drop the metric if all the data points were dropped after transformations.
return !transformMetric(metric, transform)
})
}
}

Expand Down Expand Up @@ -502,8 +515,27 @@ func rangeDataPointAttributes(metric pmetric.Metric, f func(pcommon.Map) bool) {
}
}

// transformMetric updates the metric content based on operations indicated in transform.
func transformMetric(metric pmetric.Metric, transform internalTransform) {
func countDataPoints(metric pmetric.Metric) int {
switch metric.DataType() {
case pmetric.MetricDataTypeGauge:
return metric.Gauge().DataPoints().Len()
case pmetric.MetricDataTypeSum:
return metric.Sum().DataPoints().Len()
case pmetric.MetricDataTypeHistogram:
return metric.Histogram().DataPoints().Len()
case pmetric.MetricDataTypeExponentialHistogram:
return metric.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricDataTypeSummary:
return metric.Summary().DataPoints().Len()
}
return 0
}

// transformMetric updates the metric content based on operations indicated in transform and returns a flag
// specifying whether the metric is valid after applying the translations,
// e.g. false is returned if all the data points were removed after applying the translations.
func transformMetric(metric pmetric.Metric, transform internalTransform) bool {
isMetricEmpty := countDataPoints(metric) == 0
canChangeMetric := transform.Action != Update || matchAllDps(metric, transform.MetricIncludeFilter)

if transform.NewName != "" && canChangeMetric {
Expand Down Expand Up @@ -540,4 +572,7 @@ func transformMetric(metric pmetric.Metric, transform internalTransform) {
}
}
}

// Consider metric invalid if all its data points were removed after applying the operations.
return isMetricEmpty || countDataPoints(metric) > 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func TestMetricsTransformProcessor(t *testing.T) {
for _, useOTLP := range []bool{false, true} {
for _, test := range standardTests {
t.Run(test.name, func(t *testing.T) {
if !useOTLP && test.spipOCTest {
return
}

next := new(consumertest.MetricsSink)

p := &metricsTransformProcessor{
Expand Down Expand Up @@ -65,7 +69,10 @@ func TestMetricsTransformProcessor(t *testing.T) {
// get and check results
got := next.AllMetrics()
require.Equal(t, 1, len(got))
_, _, actualOutMetrics := internaldata.ResourceMetricsToOC(got[0].ResourceMetrics().At(0))
actualOutMetrics := []*metricspb.Metric{}
if got[0].ResourceMetrics().Len() > 0 {
_, _, actualOutMetrics = internaldata.ResourceMetricsToOC(got[0].ResourceMetrics().At(0))
}
require.Equal(t, len(test.out), len(actualOutMetrics))

for idx, out := range test.out {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type metricsTransformTest struct {
name string // test name
transforms []internalTransform
spipOCTest bool // skip test on the deprecated OpenCensus data model
in []*metricspb.Metric
out []*metricspb.Metric
}
Expand Down Expand Up @@ -2078,5 +2079,32 @@ var (
build(),
},
},
{
name: "delete_all_metric_datapoints",
spipOCTest: true,
transforms: []internalTransform{
{
MetricIncludeFilter: internalFilterStrict{include: "metric"},
Action: Update,
Operations: []internalOperation{
{
configOperation: Operation{
Action: DeleteLabelValue,
Label: "label1",
LabelValue: "label1value1",
},
},
},
},
},
in: []*metricspb.Metric{
metricBuilder().setName("metric").setLabels([]string{"label1", "label2"}).
setDataType(metricspb.MetricDescriptor_GAUGE_INT64).
addTimeseries(1, []string{"label1value1", "label2value"}).
addInt64Point(0, 3, 2).
build(),
},
out: []*metricspb.Metric{},
},
}
)
4 changes: 4 additions & 0 deletions unreleased/metric-transform-remov-empty-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
change_type: enhancement
component: metricstransformprocessor
note: Do not produce empty metrics
issues: [12210]

0 comments on commit 100d245

Please sign in to comment.