Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to set Counter to be monotonic #4154

Merged
merged 4 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ The Following settings are optional:

- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.

- `is_monotonic_counter` (default value is false): Set all counter-type metrics the statsd receiver received as monotonic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true all the time? Do we need to have per counter config?

Copy link
Contributor Author

@locmai locmai Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have per counter config?

With the current change, it applies to all counters.

I think we do need for per counter config, we could add include/exclude with regex? Although it feels more like a processor job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In previous PRs on this receiver I've asked that we follow the Prometheus statsd exporter's configuration, e.g.,
https://github.com/prometheus/statsd_exporter#glob-matching

I think it is fine to add this to a TODO and continue in another PR. The current PR allows a default choice, and I would say that probably the default should be true (i.e., assume monotonic by default).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would be great if this receiver were following the statsd exporter 👍🏼

May I have this one merged until that come? I set the default to false since seeing this #1789 so I don't want to cause any trouble for the DataDog exporter.


- `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.


Expand All @@ -39,6 +41,7 @@ receivers:
endpoint: "localhost:8127"
aggregation_interval: 70s
enable_metric_type: true
is_monotonic_counter: false
timer_histogram_mapping:
- statsd_type: "histogram"
observer_type: "gauge"
Expand Down Expand Up @@ -115,6 +118,7 @@ receivers:
endpoint: "localhost:8125" # default
aggregation_interval: 60s # default
enable_metric_type: false # default
is_monotonic_counter: false # default
timer_histogram_mapping:
- statsd_type: "histogram"
observer_type: "gauge"
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
NetAddr confignet.NetAddr `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
EnableMetricType bool `mapstructure:"enable_metric_type"`
IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"`
TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"`
}

Expand Down
2 changes: 2 additions & 0 deletions receiver/statsdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
defaultTransport = "udp"
defaultAggregationInterval = 60 * time.Second
defaultEnableMetricType = false
defaultIsMonotonicCounter = false
)

var (
Expand All @@ -58,6 +59,7 @@ func createDefaultConfig() config.Receiver {
},
AggregationInterval: defaultAggregationInterval,
EnableMetricType: defaultEnableMetricType,
IsMonotonicCounter: defaultIsMonotonicCounter,
TimerHistogramMapping: defaultTimerHistogramMapping,
}
}
Expand Down
7 changes: 6 additions & 1 deletion receiver/statsdreceiver/protocol/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

func buildCounterMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
func buildCounterMetric(parsedMetric statsDMetric, isMonotonicCounter bool, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
ilm := pdata.NewInstrumentationLibraryMetrics()
nm := ilm.Metrics().AppendEmpty()
nm.SetName(parsedMetric.description.name)
if parsedMetric.unit != "" {
nm.SetUnit(parsedMetric.unit)
}
nm.SetDataType(pdata.MetricDataTypeIntSum)

nm.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
if isMonotonicCounter {
nm.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
}

nm.IntSum().SetIsMonotonic(true)

dp := nm.IntSum().DataPoints().AppendEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func TestBuildCounterMetric(t *testing.T) {
labelKeys: []string{"mykey"},
labelValues: []string{"myvalue"},
}
metric := buildCounterMetric(parsedMetric, timeNow)
isMonotonicCounter := false
metric := buildCounterMetric(parsedMetric, isMonotonicCounter, timeNow)
expectedMetrics := pdata.NewInstrumentationLibraryMetrics()
expectedMetric := expectedMetrics.Metrics().AppendEmpty()
expectedMetric.SetName("testCounter")
Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/protocol/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

// Parser is something that can map input StatsD strings to OTLP Metric representations.
type Parser interface {
Initialize(enableMetricType bool, sendTimerHistogram []TimerHistogramMapping) error
Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error
GetMetrics() pdata.Metrics
Aggregate(line string) error
}
8 changes: 5 additions & 3 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type StatsDParser struct {
summaries map[statsDMetricdescription]summaryMetric
timersAndDistributions []pdata.InstrumentationLibraryMetrics
enableMetricType bool
isMonotonicCounter bool
observeTimer string
observeHistogram string
}
Expand Down Expand Up @@ -84,13 +85,14 @@ type statsDMetricdescription struct {
labels attribute.Distinct
}

func (p *StatsDParser) Initialize(enableMetricType bool, sendTimerHistogram []TimerHistogramMapping) error {
func (p *StatsDParser) Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error {
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.timersAndDistributions = make([]pdata.InstrumentationLibraryMetrics, 0)
p.summaries = make(map[statsDMetricdescription]summaryMetric)

p.enableMetricType = enableMetricType
p.isMonotonicCounter = isMonotonicCounter
for _, eachMap := range sendTimerHistogram {
switch eachMap.StatsdType {
case "histogram":
Expand Down Expand Up @@ -159,11 +161,11 @@ func (p *StatsDParser) Aggregate(line string) error {
case statsdCounter:
_, ok := p.counters[parsedMetric.description]
if !ok {
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNowFunc())
} else {
savedValue := p.counters[parsedMetric.description].Metrics().At(0).IntSum().DataPoints().At(0).Value()
parsedMetric.intvalue = parsedMetric.intvalue + savedValue
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNowFunc())
}

case statsdHistogram:
Expand Down
73 changes: 60 additions & 13 deletions receiver/statsdreceiver/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,9 @@ func TestStatsDParser_Aggregate(t *testing.T) {
expectedGauges: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
},
expectedTimer: []pdata.InstrumentationLibraryMetrics{},
},
Expand All @@ -676,9 +676,9 @@ func TestStatsDParser_Aggregate(t *testing.T) {
},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
},
expectedTimer: []pdata.InstrumentationLibraryMetrics{},
},
Expand All @@ -704,9 +704,9 @@ func TestStatsDParser_Aggregate(t *testing.T) {
},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 215, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 215, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 75, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 75, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
},
expectedTimer: []pdata.InstrumentationLibraryMetrics{},
},
Expand All @@ -732,7 +732,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
Expand Down Expand Up @@ -790,17 +790,64 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) {
expectedGauges: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), time.Unix(711, 0)),
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), time.Unix(711, 0)),
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.Equal(t, tt.expectedGauges, p.gauges)
assert.Equal(t, tt.expectedCounters, p.counters)
}
})
}
}

func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) {
timeNowFunc = func() time.Time {
return time.Unix(711, 0)
}

tests := []struct {
name string
input []string
expectedGauges map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
expectedCounters map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
err error
}{
{
name: "counter with increment and sample rate",
input: []string{
"statsdTestMetric1:3000|c|#mykey:myvalue",
"statsdTestMetric1:4000|c|#mykey:myvalue",
"statsdTestMetric2:20|c|@0.8|#mykey:myvalue",
"statsdTestMetric2:20|c|@0.8|#mykey:myvalue",
},
expectedGauges: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
Expand Down Expand Up @@ -891,7 +938,7 @@ func TestStatsDParser_AggregateTimmerWithSummary(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})
p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
Expand All @@ -906,7 +953,7 @@ func TestStatsDParser_AggregateTimmerWithSummary(t *testing.T) {

func TestStatsDParser_Initialize(t *testing.T) {
p := &StatsDParser{}
p.Initialize(true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
labels := attribute.Distinct{}
teststatsdDMetricdescription := statsDMetricdescription{
name: "test",
Expand All @@ -920,7 +967,7 @@ func TestStatsDParser_Initialize(t *testing.T) {

func TestStatsDParser_GetMetrics(t *testing.T) {
p := &StatsDParser{}
p.Initialize(true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.gauges[testDescription("statsdTestMetric1", "g",
[]string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] =
buildGaugeMetric(testStatsDMetric("testGauge1", "", 0, 1, false, "g", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"}), time.Unix(711, 0))
Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error {
ctx, r.cancel = context.WithCancel(ctx)
var transferChan = make(chan string, 10)
ticker := time.NewTicker(r.config.AggregationInterval)
r.parser.Initialize(r.config.EnableMetricType, r.config.TimerHistogramMapping)
r.parser.Initialize(r.config.EnableMetricType, r.config.IsMonotonicCounter, r.config.TimerHistogramMapping)
go func() {
if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil {
host.ReportFatalError(err)
Expand Down