Skip to content

Commit

Permalink
Filter source resource attribute from tags.
Browse files Browse the repository at this point in the history
  • Loading branch information
keep94 committed Mar 17, 2022
1 parent 656c454 commit 7f0486e
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 27 deletions.
25 changes: 17 additions & 8 deletions exporter/tanzuobservabilityexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type metricsConsumer struct {

type metricInfo struct {
pdata.Metric
Source string
Source string
SourceKey string
}

// newMetricsConsumer returns a new metricsConsumer. consumers are the
Expand Down Expand Up @@ -102,13 +103,13 @@ func (c *metricsConsumer) Consume(ctx context.Context, md pdata.Metrics) error {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i).Resource().Attributes()
source := getSource(rm)
source, sourceKey := getSourceAndKey(rm)
ilms := rms.At(i).InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ms := ilms.At(j).Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
mi := metricInfo{Metric: m, Source: source}
mi := metricInfo{Metric: m, Source: source, SourceKey: sourceKey}
select {
case <-ctx.Done():
return multierr.Combine(append(errs, errors.New("context canceled"))...)
Expand Down Expand Up @@ -240,7 +241,7 @@ func pushGaugeNumberDataPoint(
settings component.TelemetrySettings,
missingValues *counter,
) {
tags := attributesToTagsReplaceSource(numberDataPoint.Attributes())
tags := attributesToTagsForMetrics(numberDataPoint.Attributes(), mi.SourceKey)
ts := numberDataPoint.Timestamp().AsTime().Unix()
value, err := getValue(numberDataPoint)
if err != nil {
Expand Down Expand Up @@ -338,7 +339,7 @@ func (s *sumConsumer) PushInternalMetrics(errs *[]error) {
}

func (s *sumConsumer) pushNumberDataPoint(mi metricInfo, numberDataPoint pdata.NumberDataPoint, errs *[]error) {
tags := attributesToTagsReplaceSource(numberDataPoint.Attributes())
tags := attributesToTagsForMetrics(numberDataPoint.Attributes(), mi.SourceKey)
value, err := getValue(numberDataPoint)
if err != nil {
logMissingValue(mi.Metric, s.settings, &s.missingValues)
Expand Down Expand Up @@ -483,7 +484,7 @@ func (c *cumulativeHistogramDataPointConsumer) Consume(
reporting *histogramReporting,
) {
name := mi.Name()
tags := attributesToTagsReplaceSource(histogram.Attributes())
tags := attributesToTagsForMetrics(histogram.Attributes(), mi.SourceKey)
ts := histogram.Timestamp().AsTime().Unix()
explicitBounds := histogram.ExplicitBounds()
bucketCounts := histogram.BucketCounts()
Expand Down Expand Up @@ -529,7 +530,7 @@ func (d *deltaHistogramDataPointConsumer) Consume(
errs *[]error,
reporting *histogramReporting) {
name := mi.Name()
tags := attributesToTagsReplaceSource(his.Attributes())
tags := attributesToTagsForMetrics(his.Attributes(), mi.SourceKey)
ts := his.Timestamp().AsTime().Unix()
explicitBounds := his.ExplicitBounds()
bucketCounts := his.BucketCounts()
Expand Down Expand Up @@ -655,7 +656,7 @@ func (s *summaryConsumer) sendSummaryDataPoint(
) {
name := mi.Name()
ts := summaryDataPoint.Timestamp().AsTime().Unix()
tags := attributesToTagsReplaceSource(summaryDataPoint.Attributes())
tags := attributesToTagsForMetrics(summaryDataPoint.Attributes(), mi.SourceKey)
count := summaryDataPoint.Count()
sum := summaryDataPoint.Sum()

Expand Down Expand Up @@ -686,6 +687,14 @@ func (s *summaryConsumer) sendMetric(
}
}

func attributesToTagsForMetrics(
attributes pdata.AttributeMap, sourceKey string) map[string]string {
tags := attributesToTags(attributes)
delete(tags, sourceKey)
replaceSource(tags)
return tags
}

func quantileTagValue(quantile float64) string {
return strconv.FormatFloat(quantile, 'f', -1, 64)
}
Expand Down
20 changes: 14 additions & 6 deletions exporter/tanzuobservabilityexporter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func TestMetricsConsumerNormalWithSourceTag(t *testing.T) {

assert.ElementsMatch(t, []string{"sum"}, mockSumConsumer.names)
assert.ElementsMatch(t, []string{"test_source"}, mockSumConsumer.sources)
assert.ElementsMatch(t, []string{"source"}, mockSumConsumer.sourceKeys)

assert.Equal(t, 1, mockSumConsumer.pushInternalMetricsCallCount)
assert.Equal(t, 1, sender.numFlushCalls)
Expand All @@ -88,6 +89,7 @@ func TestMetricsConsumerNormalWithHostnameTag(t *testing.T) {

assert.ElementsMatch(t, []string{"sum"}, mockSumConsumer.names)
assert.ElementsMatch(t, []string{"test_host.name"}, mockSumConsumer.sources)
assert.ElementsMatch(t, []string{"host.name"}, mockSumConsumer.sourceKeys)

assert.Equal(t, 1, mockSumConsumer.pushInternalMetricsCallCount)
assert.Equal(t, 1, sender.numFlushCalls)
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestGaugeConsumerHandleSourceTag(t *testing.T) {
addDataPoint(
0,
0,
map[string]interface{}{"source": "home"},
map[string]interface{}{"source": "home", "host.name": "test_source"},
dataPoints,
)
expected := []tobsMetric{
Expand Down Expand Up @@ -268,7 +270,7 @@ func TestGaugeConsumerMissingValue(t *testing.T) {
}

func createMetricInfo(metric pdata.Metric) metricInfo {
mi := metricInfo{Metric: metric, Source: "test_source"}
mi := metricInfo{Metric: metric, Source: "test_source", SourceKey: "host.name"}
return mi
}

Expand All @@ -283,7 +285,7 @@ func TestSumConsumerDeltaHandleSourceTag(t *testing.T) {
addDataPoint(
0,
0,
map[string]interface{}{"source": "home"},
map[string]interface{}{"source": "home", "host.name": "test_source"},
dataPoints,
)
sender := &mockSumSender{}
Expand Down Expand Up @@ -669,7 +671,9 @@ func TestCumulativeHistogramDataPointConsumer(t *testing.T) {
// Creates bounds of -Inf to <=2.0; >2.0 to <=5.0; >5.0 to <=10.0; >10.0 to +Inf
histogramDataPoint.SetExplicitBounds([]float64{2.0, 5.0, 10.0})
histogramDataPoint.SetBucketCounts([]uint64{5, 1, 3, 2})
setTags(map[string]interface{}{"foo": "bar", "source": "home"}, histogramDataPoint.Attributes())
setTags(
map[string]interface{}{"foo": "bar", "source": "home", "host.name": "test_source"},
histogramDataPoint.Attributes())
sender := &mockGaugeSender{}
report := newHistogramReporting(componenttest.NewNopTelemetrySettings())
consumer := newCumulativeHistogramDataPointConsumer(sender)
Expand Down Expand Up @@ -788,7 +792,7 @@ func TestDeltaHistogramDataPointConsumer(t *testing.T) {
histogramDataPoint.SetBucketCounts([]uint64{5, 1, 3, 2})
setDataPointTimestamp(1631234567, histogramDataPoint)
setTags(
map[string]interface{}{"bar": "baz", "source": "home"},
map[string]interface{}{"bar": "baz", "source": "home", "host.name": "test_source"},
histogramDataPoint.Attributes())
sender := &mockDistributionSender{}
report := newHistogramReporting(componenttest.NewNopTelemetrySettings())
Expand Down Expand Up @@ -902,7 +906,9 @@ func TestSummaries(t *testing.T) {

dataPoint = dataPoints.AppendEmpty()
setQuantileValues(dataPoint, 0.2, 75.0, 0.5, 125.0, 0.8, 175.0, 0.95, 225.0)
setTags(map[string]interface{}{"bar": "baz", "source": "home"}, dataPoint.Attributes())
setTags(
map[string]interface{}{"bar": "baz", "source": "home", "host.name": "test_source"},
dataPoint.Attributes())
dataPoint.SetCount(15)
dataPoint.SetSum(3000.0)
setDataPointTimestamp(1645123556, dataPoint)
Expand Down Expand Up @@ -1375,6 +1381,7 @@ type mockTypedMetricConsumer struct {
errorOnPushInternalMetrics bool
names []string
sources []string
sourceKeys []string
pushInternalMetricsCallCount int
}

Expand All @@ -1385,6 +1392,7 @@ func (m *mockTypedMetricConsumer) Type() pdata.MetricDataType {
func (m *mockTypedMetricConsumer) Consume(mi metricInfo, errs *[]error) {
m.names = append(m.names, mi.Name())
m.sources = append(m.sources, mi.Source)
m.sourceKeys = append(m.sourceKeys, mi.SourceKey)
if m.errorOnConsume {
*errs = append(*errs, errors.New("error in consume"))
}
Expand Down
31 changes: 20 additions & 11 deletions exporter/tanzuobservabilityexporter/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,37 @@ func (t *traceTransformer) Span(orig pdata.Span) (span, error) {
}, nil
}

func getSourceAndResourceTags(attributes pdata.AttributeMap) (string, map[string]string) {
candidateKeys := []string{labelSource, conventions.AttributeHostName, "hostname", conventions.AttributeHostID}

func getSourceAndResourceTagsAndSourceKey(attributes pdata.AttributeMap) (
string, map[string]string, string) {
attributesWithoutSource := map[string]string{}
var source string

attributes.Range(func(k string, v pdata.AttributeValue) bool {
attributesWithoutSource[k] = v.AsString()
return true
})

candidateKeys := []string{labelSource, conventions.AttributeHostName, "hostname", conventions.AttributeHostID}
var source string
var sourceKey string
for _, key := range candidateKeys {
if value, isFound := attributesWithoutSource[key]; isFound {
source = value
sourceKey = key
delete(attributesWithoutSource, key)
break
}
}

//returning an empty source is fine as wavefront.go.sdk will set it up to a default value(os.hostname())
return source, attributesWithoutSource, sourceKey
}

func getSourceAndResourceTags(attributes pdata.AttributeMap) (string, map[string]string) {
source, attributesWithoutSource, _ := getSourceAndResourceTagsAndSourceKey(attributes)
return source, attributesWithoutSource
}

func getSource(attributes pdata.AttributeMap) string {
result, _ := getSourceAndResourceTags(attributes)
return result
func getSourceAndKey(attributes pdata.AttributeMap) (string, string) {
source, _, sourceKey := getSourceAndResourceTagsAndSourceKey(attributes)
return source, sourceKey
}

func spanKind(span pdata.Span) string {
Expand Down Expand Up @@ -207,12 +212,16 @@ func attributesToTags(attributes ...pdata.AttributeMap) map[string]string {
return tags
}

func attributesToTagsReplaceSource(attributes ...pdata.AttributeMap) map[string]string {
tags := attributesToTags(attributes...)
func replaceSource(tags map[string]string) {
if value, isFound := tags[labelSource]; isFound {
delete(tags, labelSource)
tags["_source"] = value
}
}

func attributesToTagsReplaceSource(attributes ...pdata.AttributeMap) map[string]string {
tags := attributesToTags(attributes...)
replaceSource(tags)
return tags
}

Expand Down
16 changes: 14 additions & 2 deletions exporter/tanzuobservabilityexporter/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,24 @@ func TestGetSourceAndResourceTags(t *testing.T) {
}
}

func TestGetSource(t *testing.T) {
func TestGetSourceAndKey(t *testing.T) {
resAttrs := pdata.NewAttributeMap()
resAttrs.InsertString(labelSource, "some_source")
resAttrs.InsertString(conventions.AttributeHostName, "test_host.name")

assert.Equal(t, "some_source", getSource(resAttrs))
source, sourceKey := getSourceAndKey(resAttrs)
assert.Equal(t, "some_source", source)
assert.Equal(t, labelSource, sourceKey)
}

func TestGetSourceAndKeyNotFound(t *testing.T) {
resAttrs := pdata.NewAttributeMap()
resAttrs.InsertString("foo", "some_source")
resAttrs.InsertString("bar", "test_host.name")

source, sourceKey := getSourceAndKey(resAttrs)
assert.Equal(t, "", source)
assert.Equal(t, "", sourceKey)
}

func TestAttributesToTagsReplaceSource(t *testing.T) {
Expand Down

0 comments on commit 7f0486e

Please sign in to comment.