Skip to content

Commit

Permalink
tanzuobservabilityexporter: use uber/atomic instead of sync/atomic (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#9814)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored May 10, 2022
1 parent 39a694e commit 901e67f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 46 deletions.
2 changes: 1 addition & 1 deletion exporter/tanzuobservabilityexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
)
Expand Down Expand Up @@ -38,7 +39,6 @@ require (
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
75 changes: 30 additions & 45 deletions exporter/tanzuobservabilityexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"fmt"
"math"
"strconv"
"sync/atomic"

"github.com/wavefronthq/wavefront-sdk-go/histogram"
"github.com/wavefronthq/wavefront-sdk-go/senders"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -180,37 +180,20 @@ type flushCloser interface {
Close()
}

// counter represents an internal counter metric. The zero value is ready to use
type counter struct {
count int64
}

// Report reports this counter to tanzu observability. name is the name of
// report the counter to tanzu observability. name is the name of
// the metric to be reported. tags is the tags for the metric. sender is what
// sends the metric to tanzu observability. Any errors get added to errs.
func (c *counter) Report(
name string, tags map[string]string, sender gaugeSender, errs *[]error,
) {
err := sender.SendMetric(name, float64(c.Get()), 0, "", tags)
func report(count *atomic.Int64, name string, tags map[string]string, sender gaugeSender, errs *[]error) {
err := sender.SendMetric(name, float64(count.Load()), 0, "", tags)
if err != nil {
*errs = append(*errs, err)
}
}

// Inc increments this counter by one.
func (c *counter) Inc() {
atomic.AddInt64(&c.count, 1)
}

// Get gets the value of this counter.
func (c *counter) Get() int64 {
return atomic.LoadInt64(&c.count)
}

// logMissingValue keeps track of metrics with missing values. metric is the
// metric with the missing value. settings logs the missing value. count counts
// metrics with missing values.
func logMissingValue(metric pmetric.Metric, settings component.TelemetrySettings, count *counter) {
func logMissingValue(metric pmetric.Metric, settings component.TelemetrySettings, count *atomic.Int64) {
namef := zap.String(metricNameString, metric.Name())
typef := zap.String(metricTypeString, metric.DataType().String())
settings.Logger.Debug("Metric missing value", namef, typef)
Expand Down Expand Up @@ -240,7 +223,7 @@ func pushGaugeNumberDataPoint(
errs *[]error,
sender gaugeSender,
settings component.TelemetrySettings,
missingValues *counter,
missingValues *atomic.Int64,
) {
tags := attributesToTagsForMetrics(numberDataPoint.Attributes(), mi.SourceKey)
ts := numberDataPoint.Timestamp().AsTime().Unix()
Expand All @@ -263,16 +246,17 @@ type gaugeSender interface {
type gaugeConsumer struct {
sender gaugeSender
settings component.TelemetrySettings
missingValues counter
missingValues *atomic.Int64
}

// newGaugeConsumer returns a typedMetricConsumer that consumes gauge metrics
// by sending them to tanzu observability.
func newGaugeConsumer(
sender gaugeSender, settings component.TelemetrySettings) typedMetricConsumer {
return &gaugeConsumer{
sender: sender,
settings: settings,
sender: sender,
settings: settings,
missingValues: atomic.NewInt64(0),
}
}

Expand All @@ -290,27 +274,28 @@ func (g *gaugeConsumer) Consume(mi metricInfo, errs *[]error) {
errs,
g.sender,
g.settings,
&g.missingValues)
g.missingValues)
}
}

func (g *gaugeConsumer) PushInternalMetrics(errs *[]error) {
g.missingValues.Report(missingValueMetricName, typeIsGaugeTags, g.sender, errs)
report(g.missingValues, missingValueMetricName, typeIsGaugeTags, g.sender, errs)
}

type sumConsumer struct {
sender senders.MetricSender
settings component.TelemetrySettings
missingValues counter
missingValues *atomic.Int64
}

// newSumConsumer returns a typedMetricConsumer that consumes sum metrics
// by sending them to tanzu observability.
func newSumConsumer(
sender senders.MetricSender, settings component.TelemetrySettings) typedMetricConsumer {
return &sumConsumer{
sender: sender,
settings: settings,
sender: sender,
settings: settings,
missingValues: atomic.NewInt64(0),
}
}

Expand All @@ -330,20 +315,20 @@ func (s *sumConsumer) Consume(mi metricInfo, errs *[]error) {
s.pushNumberDataPoint(mi, numberDataPoints.At(i), errs)
} else {
pushGaugeNumberDataPoint(
mi, numberDataPoints.At(i), errs, s.sender, s.settings, &s.missingValues)
mi, numberDataPoints.At(i), errs, s.sender, s.settings, s.missingValues)
}
}
}

func (s *sumConsumer) PushInternalMetrics(errs *[]error) {
s.missingValues.Report(missingValueMetricName, typeIsSumTags, s.sender, errs)
report(s.missingValues, missingValueMetricName, typeIsSumTags, s.sender, errs)
}

func (s *sumConsumer) pushNumberDataPoint(mi metricInfo, numberDataPoint pmetric.NumberDataPoint, errs *[]error) {
tags := attributesToTagsForMetrics(numberDataPoint.Attributes(), mi.SourceKey)
value, err := getValue(numberDataPoint)
if err != nil {
logMissingValue(mi.Metric, s.settings, &s.missingValues)
logMissingValue(mi.Metric, s.settings, s.missingValues)
return
}
err = s.sender.SendDeltaCounter(mi.Name(), value, mi.Source, tags)
Expand All @@ -355,24 +340,28 @@ func (s *sumConsumer) pushNumberDataPoint(mi metricInfo, numberDataPoint pmetric
// histogramReporting takes care of logging and internal metrics for histograms
type histogramReporting struct {
settings component.TelemetrySettings
malformedHistograms counter
noAggregationTemporality counter
malformedHistograms *atomic.Int64
noAggregationTemporality *atomic.Int64
}

// newHistogramReporting returns a new histogramReporting instance.
func newHistogramReporting(settings component.TelemetrySettings) *histogramReporting {
return &histogramReporting{settings: settings}
return &histogramReporting{
settings: settings,
malformedHistograms: atomic.NewInt64(0),
noAggregationTemporality: atomic.NewInt64(0),
}
}

// Malformed returns the number of malformed histogram data points.
func (r *histogramReporting) Malformed() int64 {
return r.malformedHistograms.Get()
return r.malformedHistograms.Load()
}

// NoAggregationTemporality returns the number of histogram metrics that have no
// aggregation temporality.
func (r *histogramReporting) NoAggregationTemporality() int64 {
return r.noAggregationTemporality.Get()
return r.noAggregationTemporality.Load()
}

// LogMalformed logs seeing one malformed data point.
Expand All @@ -392,12 +381,8 @@ func (r *histogramReporting) LogNoAggregationTemporality(metric pmetric.Metric)
// Report sends the counts in this instance to wavefront.
// sender is what sends to wavefront. Any errors sending get added to errs.
func (r *histogramReporting) Report(sender gaugeSender, errs *[]error) {
r.malformedHistograms.Report(malformedHistogramMetricName, nil, sender, errs)
r.noAggregationTemporality.Report(
noAggregationTemporalityMetricName,
typeIsHistogramTags,
sender,
errs)
report(r.malformedHistograms, malformedHistogramMetricName, nil, sender, errs)
report(r.noAggregationTemporality, noAggregationTemporalityMetricName, typeIsHistogramTags, sender, errs)
}

type histogramConsumer struct {
Expand Down

0 comments on commit 901e67f

Please sign in to comment.