Skip to content

Commit

Permalink
Add option to set Counter to be monotonic (#4154)
Browse files Browse the repository at this point in the history
* Add option to set Counter to be monotonic

* update tests

* update tests 2

* update tests 3
  • Loading branch information
locmai committed Jul 20, 2021
1 parent c7d94c3 commit 57676a9
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 20 deletions.
4 changes: 4 additions & 0 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ The Following settings are optional:

- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.

- `is_monotonic_counter` (default value is false): Set all counter-type metrics the statsd receiver received as monotonic.

- `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.


Expand All @@ -39,6 +41,7 @@ receivers:
endpoint: "localhost:8127"
aggregation_interval: 70s
enable_metric_type: true
is_monotonic_counter: false
timer_histogram_mapping:
- statsd_type: "histogram"
observer_type: "gauge"
Expand Down Expand Up @@ -115,6 +118,7 @@ receivers:
endpoint: "localhost:8125" # default
aggregation_interval: 60s # default
enable_metric_type: false # default
is_monotonic_counter: false # default
timer_histogram_mapping:
- statsd_type: "histogram"
observer_type: "gauge"
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
NetAddr confignet.NetAddr `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
EnableMetricType bool `mapstructure:"enable_metric_type"`
IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"`
TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"`
}

Expand Down
2 changes: 2 additions & 0 deletions receiver/statsdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
defaultTransport = "udp"
defaultAggregationInterval = 60 * time.Second
defaultEnableMetricType = false
defaultIsMonotonicCounter = false
)

var (
Expand All @@ -58,6 +59,7 @@ func createDefaultConfig() config.Receiver {
},
AggregationInterval: defaultAggregationInterval,
EnableMetricType: defaultEnableMetricType,
IsMonotonicCounter: defaultIsMonotonicCounter,
TimerHistogramMapping: defaultTimerHistogramMapping,
}
}
Expand Down
7 changes: 6 additions & 1 deletion receiver/statsdreceiver/protocol/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)

func buildCounterMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
func buildCounterMetric(parsedMetric statsDMetric, isMonotonicCounter bool, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
ilm := pdata.NewInstrumentationLibraryMetrics()
nm := ilm.Metrics().AppendEmpty()
nm.SetName(parsedMetric.description.name)
if parsedMetric.unit != "" {
nm.SetUnit(parsedMetric.unit)
}
nm.SetDataType(pdata.MetricDataTypeIntSum)

nm.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
if isMonotonicCounter {
nm.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
}

nm.IntSum().SetIsMonotonic(true)

dp := nm.IntSum().DataPoints().AppendEmpty()
Expand Down
3 changes: 2 additions & 1 deletion receiver/statsdreceiver/protocol/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func TestBuildCounterMetric(t *testing.T) {
labelKeys: []string{"mykey"},
labelValues: []string{"myvalue"},
}
metric := buildCounterMetric(parsedMetric, timeNow)
isMonotonicCounter := false
metric := buildCounterMetric(parsedMetric, isMonotonicCounter, timeNow)
expectedMetrics := pdata.NewInstrumentationLibraryMetrics()
expectedMetric := expectedMetrics.Metrics().AppendEmpty()
expectedMetric.SetName("testCounter")
Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/protocol/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

// Parser is something that can map input StatsD strings to OTLP Metric representations.
type Parser interface {
Initialize(enableMetricType bool, sendTimerHistogram []TimerHistogramMapping) error
Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error
GetMetrics() pdata.Metrics
Aggregate(line string) error
}
8 changes: 5 additions & 3 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type StatsDParser struct {
summaries map[statsDMetricdescription]summaryMetric
timersAndDistributions []pdata.InstrumentationLibraryMetrics
enableMetricType bool
isMonotonicCounter bool
observeTimer string
observeHistogram string
}
Expand Down Expand Up @@ -84,13 +85,14 @@ type statsDMetricdescription struct {
labels attribute.Distinct
}

func (p *StatsDParser) Initialize(enableMetricType bool, sendTimerHistogram []TimerHistogramMapping) error {
func (p *StatsDParser) Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error {
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.timersAndDistributions = make([]pdata.InstrumentationLibraryMetrics, 0)
p.summaries = make(map[statsDMetricdescription]summaryMetric)

p.enableMetricType = enableMetricType
p.isMonotonicCounter = isMonotonicCounter
for _, eachMap := range sendTimerHistogram {
switch eachMap.StatsdType {
case "histogram":
Expand Down Expand Up @@ -159,11 +161,11 @@ func (p *StatsDParser) Aggregate(line string) error {
case statsdCounter:
_, ok := p.counters[parsedMetric.description]
if !ok {
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNowFunc())
} else {
savedValue := p.counters[parsedMetric.description].Metrics().At(0).IntSum().DataPoints().At(0).Value()
parsedMetric.intvalue = parsedMetric.intvalue + savedValue
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter, timeNowFunc())
}

case statsdHistogram:
Expand Down
73 changes: 60 additions & 13 deletions receiver/statsdreceiver/protocol/statsd_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,9 @@ func TestStatsDParser_Aggregate(t *testing.T) {
expectedGauges: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
},
expectedTimer: []pdata.InstrumentationLibraryMetrics{},
},
Expand All @@ -676,9 +676,9 @@ func TestStatsDParser_Aggregate(t *testing.T) {
},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
},
expectedTimer: []pdata.InstrumentationLibraryMetrics{},
},
Expand All @@ -704,9 +704,9 @@ func TestStatsDParser_Aggregate(t *testing.T) {
},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 215, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 215, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 75, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), time.Unix(711, 0)),
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 75, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), false, time.Unix(711, 0)),
},
expectedTimer: []pdata.InstrumentationLibraryMetrics{},
},
Expand All @@ -732,7 +732,7 @@ func TestStatsDParser_Aggregate(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
Expand Down Expand Up @@ -790,17 +790,64 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) {
expectedGauges: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), time.Unix(711, 0)),
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), time.Unix(711, 0)),
[]string{"mykey", "metric_type"}, []string{"myvalue", "counter"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "counter"}), false, time.Unix(711, 0)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
if tt.err != nil {
assert.Equal(t, tt.err, err)
} else {
assert.Equal(t, tt.expectedGauges, p.gauges)
assert.Equal(t, tt.expectedCounters, p.counters)
}
})
}
}

func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) {
timeNowFunc = func() time.Time {
return time.Unix(711, 0)
}

tests := []struct {
name string
input []string
expectedGauges map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
expectedCounters map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
err error
}{
{
name: "counter with increment and sample rate",
input: []string{
"statsdTestMetric1:3000|c|#mykey:myvalue",
"statsdTestMetric1:4000|c|#mykey:myvalue",
"statsdTestMetric2:20|c|@0.8|#mykey:myvalue",
"statsdTestMetric2:20|c|@0.8|#mykey:myvalue",
},
expectedGauges: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{},
expectedCounters: map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics{
testDescription("statsdTestMetric1", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric1", "", 7000, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0)),
testDescription("statsdTestMetric2", "c",
[]string{"mykey"}, []string{"myvalue"}): buildCounterMetric(testStatsDMetric("statsdTestMetric2", "", 50, 0, false, "c", 0, []string{"mykey"}, []string{"myvalue"}), true, time.Unix(711, 0)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
Expand Down Expand Up @@ -891,7 +938,7 @@ func TestStatsDParser_AggregateTimmerWithSummary(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var err error
p := &StatsDParser{}
p.Initialize(false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})
p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})
for _, line := range tt.input {
err = p.Aggregate(line)
}
Expand All @@ -906,7 +953,7 @@ func TestStatsDParser_AggregateTimmerWithSummary(t *testing.T) {

func TestStatsDParser_Initialize(t *testing.T) {
p := &StatsDParser{}
p.Initialize(true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
labels := attribute.Distinct{}
teststatsdDMetricdescription := statsDMetricdescription{
name: "test",
Expand All @@ -920,7 +967,7 @@ func TestStatsDParser_Initialize(t *testing.T) {

func TestStatsDParser_GetMetrics(t *testing.T) {
p := &StatsDParser{}
p.Initialize(true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})
p.gauges[testDescription("statsdTestMetric1", "g",
[]string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] =
buildGaugeMetric(testStatsDMetric("testGauge1", "", 0, 1, false, "g", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"}), time.Unix(711, 0))
Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error {
ctx, r.cancel = context.WithCancel(ctx)
var transferChan = make(chan string, 10)
ticker := time.NewTicker(r.config.AggregationInterval)
r.parser.Initialize(r.config.EnableMetricType, r.config.TimerHistogramMapping)
r.parser.Initialize(r.config.EnableMetricType, r.config.IsMonotonicCounter, r.config.TimerHistogramMapping)
go func() {
if err := r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan); err != nil {
host.ReportFatalError(err)
Expand Down

0 comments on commit 57676a9

Please sign in to comment.