From 763713b04dae16bd4a63f330246b7853acef7d9f Mon Sep 17 00:00:00 2001 From: "Gavin Zhang (Kunyuan Zhang)" <31523962+gavindoudou@users.noreply.github.com> Date: Tue, 15 Dec 2020 14:02:06 -0800 Subject: [PATCH] [statsdreceiver]add aggregation for StatsD receiver (#1670) * add aggregation for StatsD receiver, support metrics types: gauge and counter * revise readme for StatsD receiver * Use Distinct in OTel-Go SDK as the description field for statsDMetric --- receiver/statsdreceiver/README.md | 41 +- receiver/statsdreceiver/config.go | 3 + receiver/statsdreceiver/config_test.go | 2 + receiver/statsdreceiver/factory.go | 9 +- receiver/statsdreceiver/factory_test.go | 12 + receiver/statsdreceiver/go.mod | 1 + receiver/statsdreceiver/protocol/parser.go | 4 +- .../statsdreceiver/protocol/statsd_parser.go | 225 +++-- .../protocol/statsd_parser_test.go | 950 +++++++++++++++--- receiver/statsdreceiver/receiver.go | 43 +- receiver/statsdreceiver/receiver_test.go | 60 +- receiver/statsdreceiver/testdata/config.yaml | 1 + receiver/statsdreceiver/transport/server.go | 1 + .../statsdreceiver/transport/server_test.go | 18 +- .../statsdreceiver/transport/udp_server.go | 28 +- 15 files changed, 1088 insertions(+), 310 deletions(-) diff --git a/receiver/statsdreceiver/README.md b/receiver/statsdreceiver/README.md index 5acb9e7fd8d4a..a3731de2b1bd4 100644 --- a/receiver/statsdreceiver/README.md +++ b/receiver/statsdreceiver/README.md @@ -1,6 +1,6 @@ # StatsD Receiver -StatsD receiver for ingesting StatsD messages into the OpenTelemetry Collector. +StatsD receiver for ingesting StatsD messages(https://github.com/statsd/statsd/blob/master/docs/metric_types.md) into the OpenTelemetry Collector. Supported pipeline types: metrics @@ -12,6 +12,11 @@ The following settings are required: - `endpoint` (default = `localhost:8125`): Address and port to listen on. + +The Following settings are optional: + +- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server) + Example: ```yaml @@ -19,6 +24,7 @@ receivers: statsd: statsd/2: endpoint: "localhost:8127" + aggregation_interval: 70s ``` The full list of settings exposed for this receiver are documented [here](./config.go) @@ -26,13 +32,26 @@ with detailed sample configurations [here](./testdata/config.yaml). ## Aggregation -Currently the `statsdreceiver` is not providing any aggregation. There are -ideas such as the [Metrics Transform Processor -Proposal](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/332) -that intend to enable control over Metric aggregation in a processor. - -An alternative will be to implement some simple aggregation in this receiver. - +Aggregation is done in statsD receiver. The default aggregation interval is 60s. The receiver only aggregates the metrics with the same metric name, metric type, label keys and label values. After each aggregation interval, the receiver will send all metrics (after aggregation) in this aggregation interval to the following workflow. + +It supports: + +Gauge(transferred to double): +- statsdTestMetric1:500|g|#mykey:myvalue +statsdTestMetric1:400|g|#mykey:myvalue +(get the latest value: 400) +- statsdTestMetric1:500|g|#mykey:myvalue +statsdTestMetric1:+2|g|#mykey:myvalue +statsdTestMetric1:-1|g|#mykey:myvalue +(get the value after calculation: 501) + +Counter(transferred to int): +- statsdTestMetric1:3000|c|#mykey:myvalue +statsdTestMetric1:4000|c|#mykey:myvalue +(get the value after incrementation: 7000) +- statsdTestMetric1:3000|c|#mykey:myvalue +statsdTestMetric1:20|c|@0.8|#mykey:myvalue +(get the value after incrementation with sample rate: 3000+20/0.8=3025) ## Metrics General format is: @@ -43,14 +62,15 @@ General format is: `:|c|@|#:` +it supports sample rate + ### Gauge `:|g|@|#:` ### Timer -`:|ms|@|#:` - +TODO: add support for timer ## Testing @@ -60,6 +80,7 @@ General format is: receivers: statsd: endpoint: "localhost:8125" # default + aggregation_interval: 60s # default exporters: file: diff --git a/receiver/statsdreceiver/config.go b/receiver/statsdreceiver/config.go index 409af2d5a796d..fabf1f2cb614e 100644 --- a/receiver/statsdreceiver/config.go +++ b/receiver/statsdreceiver/config.go @@ -15,6 +15,8 @@ package statsdreceiver import ( + "time" + "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/confignet" ) @@ -23,4 +25,5 @@ import ( type Config struct { configmodels.ReceiverSettings `mapstructure:",squash"` NetAddr confignet.NetAddr `mapstructure:",squash"` + AggregationInterval time.Duration `mapstructure:"aggregation_interval"` } diff --git a/receiver/statsdreceiver/config_test.go b/receiver/statsdreceiver/config_test.go index 4bed6418a4d95..7ba117b97b55f 100644 --- a/receiver/statsdreceiver/config_test.go +++ b/receiver/statsdreceiver/config_test.go @@ -17,6 +17,7 @@ package statsdreceiver import ( "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,5 +55,6 @@ func TestLoadConfig(t *testing.T) { Endpoint: "localhost:12345", Transport: "custom_transport", }, + AggregationInterval: 70 * time.Second, }, r1) } diff --git a/receiver/statsdreceiver/factory.go b/receiver/statsdreceiver/factory.go index 556775c352b32..6adba19a38daf 100644 --- a/receiver/statsdreceiver/factory.go +++ b/receiver/statsdreceiver/factory.go @@ -16,6 +16,7 @@ package statsdreceiver import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" @@ -26,9 +27,10 @@ import ( const ( // The value of "type" key in configuration. - typeStr = "statsd" - defaultBindEndpoint = "localhost:8125" - defaultTransport = "udp" + typeStr = "statsd" + defaultBindEndpoint = "localhost:8125" + defaultTransport = "udp" + defaultAggregationInterval = 60 * time.Second ) // NewFactory creates a factory for the StatsD receiver. @@ -50,6 +52,7 @@ func createDefaultConfig() configmodels.Receiver { Endpoint: defaultBindEndpoint, Transport: defaultTransport, }, + AggregationInterval: defaultAggregationInterval, } } diff --git a/receiver/statsdreceiver/factory_test.go b/receiver/statsdreceiver/factory_test.go index 1ba2a2c48bee8..1c98fa3c974b8 100644 --- a/receiver/statsdreceiver/factory_test.go +++ b/receiver/statsdreceiver/factory_test.go @@ -41,3 +41,15 @@ func TestCreateReceiver(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, tReceiver, "receiver creation failed") } + +func TestCreateMetricsReceiverWithNilConsumer(t *testing.T) { + receiver, err := createMetricsReceiver( + context.Background(), + component.ReceiverCreateParams{Logger: zap.NewNop()}, + createDefaultConfig(), + nil, + ) + + assert.Error(t, err, "nil consumer") + assert.Nil(t, receiver) +} diff --git a/receiver/statsdreceiver/go.mod b/receiver/statsdreceiver/go.mod index 3fc14e63859c0..2bcff762d7462 100644 --- a/receiver/statsdreceiver/go.mod +++ b/receiver/statsdreceiver/go.mod @@ -7,6 +7,7 @@ require ( github.com/stretchr/testify v1.6.1 go.opencensus.io v0.22.5 go.opentelemetry.io/collector v0.16.1-0.20201207152538-326931de8c32 + go.opentelemetry.io/otel v0.13.0 go.uber.org/zap v1.16.0 google.golang.org/protobuf v1.25.0 ) diff --git a/receiver/statsdreceiver/protocol/parser.go b/receiver/statsdreceiver/protocol/parser.go index 14f752ae78bdf..7c5d9fe6e75d6 100644 --- a/receiver/statsdreceiver/protocol/parser.go +++ b/receiver/statsdreceiver/protocol/parser.go @@ -20,5 +20,7 @@ import ( // Parser is something that can map input StatsD strings to OTLP Metric representations. type Parser interface { - Parse(in string) (*metricspb.Metric, error) + Initialize() error + GetMetrics() []*metricspb.Metric + Aggregate(line string) error } diff --git a/receiver/statsdreceiver/protocol/statsd_parser.go b/receiver/statsdreceiver/protocol/statsd_parser.go index a3dfb1ce303b7..4cdff0c71a0bd 100644 --- a/receiver/statsdreceiver/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/protocol/statsd_parser.go @@ -22,6 +22,7 @@ import ( "time" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "go.opentelemetry.io/otel/label" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -31,67 +32,130 @@ var ( ) func getSupportedTypes() []string { - return []string{"c", "g", "ms"} + return []string{"c", "g"} } // StatsDParser supports the Parse method for parsing StatsD messages with Tags. -type StatsDParser struct{} +type StatsDParser struct { + gauges map[statsDMetricdescription]*metricspb.Metric + counters map[statsDMetricdescription]*metricspb.Metric +} type statsDMetric struct { + description statsDMetricdescription + value string + intvalue int64 + floatvalue float64 + addition bool + unit string + metricType metricspb.MetricDescriptor_Type + sampleRate float64 + labelKeys []*metricspb.LabelKey + labelValues []*metricspb.LabelValue +} + +type statsDMetricdescription struct { name string - value string statsdMetricType string - unit string - metricType metricspb.MetricDescriptor_Type - sampleRate float64 - labelKeys []*metricspb.LabelKey - labelValues []*metricspb.LabelValue + labels label.Distinct +} + +func (p *StatsDParser) Initialize() error { + p.gauges = make(map[statsDMetricdescription]*metricspb.Metric) + p.counters = make(map[statsDMetricdescription]*metricspb.Metric) + return nil +} + +// get the metrics preparing for flushing and reset the state +func (p *StatsDParser) GetMetrics() []*metricspb.Metric { + var metrics []*metricspb.Metric + + for _, metric := range p.gauges { + metrics = append(metrics, metric) + } + + for _, metric := range p.counters { + metrics = append(metrics, metric) + } + + p.gauges = make(map[statsDMetricdescription]*metricspb.Metric) + p.counters = make(map[statsDMetricdescription]*metricspb.Metric) + + return metrics } var timeNowFunc = func() int64 { return time.Now().Unix() } -// Parse returns an OTLP metric representation of the input StatsD string. -func (p *StatsDParser) Parse(line string) (*metricspb.Metric, error) { +//aggregate for each metric line +func (p *StatsDParser) Aggregate(line string) error { parsedMetric, err := parseMessageToMetric(line) if err != nil { - return nil, err + return err } + switch parsedMetric.description.statsdMetricType { + case "g": + _, ok := p.gauges[parsedMetric.description] + if !ok { + metricPoint := buildPoint(parsedMetric) + p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint) + } else { + if parsedMetric.addition { + savedValue := p.gauges[parsedMetric.description].GetTimeseries()[0].Points[0].GetDoubleValue() + parsedMetric.floatvalue = parsedMetric.floatvalue + savedValue + metricPoint := buildPoint(parsedMetric) + p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint) + } else { + metricPoint := buildPoint(parsedMetric) + p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint) + } + } - metricPoint, err := buildPoint(parsedMetric) - if err != nil { - return nil, err + case "c": + _, ok := p.counters[parsedMetric.description] + if !ok { + metricPoint := buildPoint(parsedMetric) + p.counters[parsedMetric.description] = buildMetric(parsedMetric, metricPoint) + } else { + savedValue := p.counters[parsedMetric.description].GetTimeseries()[0].Points[0].GetInt64Value() + parsedMetric.intvalue = parsedMetric.intvalue + savedValue + metricPoint := buildPoint(parsedMetric) + p.counters[parsedMetric.description] = buildMetric(parsedMetric, metricPoint) + } } - return buildMetric(parsedMetric, metricPoint), nil + return nil } -func parseMessageToMetric(line string) (*statsDMetric, error) { - result := &statsDMetric{} +func parseMessageToMetric(line string) (statsDMetric, error) { + result := statsDMetric{} parts := strings.Split(line, "|") if len(parts) < 2 { - return nil, fmt.Errorf("invalid message format: %s", line) + return result, fmt.Errorf("invalid message format: %s", line) } separatorIndex := strings.IndexByte(parts[0], ':') if separatorIndex < 0 { - return nil, fmt.Errorf("invalid : format: %s", parts[0]) + return result, fmt.Errorf("invalid : format: %s", parts[0]) } - result.name = parts[0][0:separatorIndex] - if result.name == "" { - return nil, errEmptyMetricName + result.description.name = parts[0][0:separatorIndex] + if result.description.name == "" { + return result, errEmptyMetricName } result.value = parts[0][separatorIndex+1:] if result.value == "" { - return nil, errEmptyMetricValue + return result, errEmptyMetricValue + } + if strings.HasPrefix(result.value, "-") || strings.HasPrefix(result.value, "+") { + result.addition = true } - result.statsdMetricType = parts[1] - if !contains(getSupportedTypes(), result.statsdMetricType) { - return nil, fmt.Errorf("unsupported metric type: %s", result.statsdMetricType) + result.description.statsdMetricType = parts[1] + if !contains(getSupportedTypes(), result.description.statsdMetricType) { + return result, fmt.Errorf("unsupported metric type: %s", result.description.statsdMetricType) } additionalParts := parts[2:] @@ -102,7 +166,7 @@ func parseMessageToMetric(line string) (*statsDMetric, error) { f, err := strconv.ParseFloat(sampleRateStr, 64) if err != nil { - return nil, fmt.Errorf("parse sample rate: %s", sampleRateStr) + return result, fmt.Errorf("parse sample rate: %s", sampleRateStr) } result.sampleRate = f @@ -114,21 +178,48 @@ func parseMessageToMetric(line string) (*statsDMetric, error) { result.labelKeys = make([]*metricspb.LabelKey, 0, len(tagSets)) result.labelValues = make([]*metricspb.LabelValue, 0, len(tagSets)) + var kvs []label.KeyValue + var sortable label.Sortable for _, tagSet := range tagSets { tagParts := strings.Split(tagSet, ":") if len(tagParts) != 2 { - return nil, fmt.Errorf("invalid tag format: %s", tagParts) + return result, fmt.Errorf("invalid tag format: %s", tagParts) } result.labelKeys = append(result.labelKeys, &metricspb.LabelKey{Key: tagParts[0]}) result.labelValues = append(result.labelValues, &metricspb.LabelValue{ Value: tagParts[1], HasValue: true, }) + kvs = append(kvs, label.String(tagParts[0], tagParts[1])) } + set := label.NewSetWithSortable(kvs, &sortable) + result.description.labels = set.Equivalent() } else { - return nil, fmt.Errorf("unrecognized message part: %s", part) + return result, fmt.Errorf("unrecognized message part: %s", part) } } + + switch result.description.statsdMetricType { + case "g": + f, err := strconv.ParseFloat(result.value, 64) + if err != nil { + return result, fmt.Errorf("gauge: parse metric value string: %s", result.value) + } + result.floatvalue = f + result.metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE + case "c": + f, err := strconv.ParseFloat(result.value, 64) + if err != nil { + return result, fmt.Errorf("counter: parse metric value string: %s", result.value) + } + i := int64(f) + if 0 < result.sampleRate && result.sampleRate < 1 { + i = int64(f / result.sampleRate) + } + result.intvalue = i + result.metricType = metricspb.MetricDescriptor_GAUGE_INT64 + } + return result, nil } @@ -141,10 +232,10 @@ func contains(slice []string, element string) bool { return false } -func buildMetric(metric *statsDMetric, point *metricspb.Point) *metricspb.Metric { +func buildMetric(metric statsDMetric, point *metricspb.Point) *metricspb.Metric { return &metricspb.Metric{ MetricDescriptor: &metricspb.MetricDescriptor{ - Name: metric.name, + Name: metric.description.name, Type: metric.metricType, LabelKeys: metric.labelKeys, Unit: metric.unit, @@ -160,83 +251,37 @@ func buildMetric(metric *statsDMetric, point *metricspb.Point) *metricspb.Metric } } -func buildPoint(parsedMetric *statsDMetric) (*metricspb.Point, error) { +func buildPoint(parsedMetric statsDMetric) *metricspb.Point { now := ×tamppb.Timestamp{ Seconds: timeNowFunc(), } - switch parsedMetric.statsdMetricType { + switch parsedMetric.description.statsdMetricType { case "c": - return buildCounterPoint(parsedMetric, now, func(parsedMetric *statsDMetric) { - parsedMetric.metricType = metricspb.MetricDescriptor_CUMULATIVE_INT64 - }) + return buildCounterPoint(parsedMetric, now) case "g": - return buildGaugePoint(parsedMetric, now, func(parsedMetric *statsDMetric) { - parsedMetric.metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE - }) - case "ms": - return buildTimerPoint(parsedMetric, now, func(parsedMetric *statsDMetric) { - parsedMetric.metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE - }) + return buildGaugePoint(parsedMetric, now) } - return nil, fmt.Errorf("unhandled metric type: %s", parsedMetric.statsdMetricType) + return nil } -func buildCounterPoint(parsedMetric *statsDMetric, now *timestamppb.Timestamp, - metricTypeSetter func(parsedMetric *statsDMetric)) (*metricspb.Point, error) { - var point *metricspb.Point - var i int64 - - f, err := strconv.ParseFloat(parsedMetric.value, 64) - if err != nil { - return nil, fmt.Errorf("counter: parse metric value string: %s", parsedMetric.value) - } - i = int64(f) - if 0 < parsedMetric.sampleRate && parsedMetric.sampleRate < 1 { - i = int64(f / parsedMetric.sampleRate) - } - point = &metricspb.Point{ +func buildCounterPoint(parsedMetric statsDMetric, now *timestamppb.Timestamp) *metricspb.Point { + point := &metricspb.Point{ Timestamp: now, Value: &metricspb.Point_Int64Value{ - Int64Value: i, + Int64Value: parsedMetric.intvalue, }, } - metricTypeSetter(parsedMetric) - return point, nil + return point } -func buildGaugePoint(parsedMetric *statsDMetric, now *timestamppb.Timestamp, - metricTypeSetter func(parsedMetric *statsDMetric)) (*metricspb.Point, error) { - var point *metricspb.Point - - f, err := strconv.ParseFloat(parsedMetric.value, 64) - if err != nil { - return nil, fmt.Errorf("gauge: parse metric value string: %s", parsedMetric.value) - } - point = &metricspb.Point{ - Timestamp: now, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: f, - }, - } - metricTypeSetter(parsedMetric) - return point, nil -} - -func buildTimerPoint(parsedMetric *statsDMetric, now *timestamppb.Timestamp, metricTypeSetter func(parsedMetric *statsDMetric)) (*metricspb.Point, error) { - var point *metricspb.Point - parsedMetric.unit = "ms" - f, err := strconv.ParseFloat(parsedMetric.value, 64) - if err != nil { - return nil, fmt.Errorf("timer: failed to parse metric value to float: %s", parsedMetric.value) - } - point = &metricspb.Point{ +func buildGaugePoint(parsedMetric statsDMetric, now *timestamppb.Timestamp) *metricspb.Point { + point := &metricspb.Point{ Timestamp: now, Value: &metricspb.Point_DoubleValue{ - DoubleValue: f, + DoubleValue: parsedMetric.floatvalue, }, } - metricTypeSetter(parsedMetric) - return point, nil + return point } diff --git a/receiver/statsdreceiver/protocol/statsd_parser_test.go b/receiver/statsdreceiver/protocol/statsd_parser_test.go index a91574fc968fa..b73d0e3560169 100644 --- a/receiver/statsdreceiver/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/protocol/statsd_parser_test.go @@ -20,18 +20,16 @@ import ( metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/label" "google.golang.org/protobuf/types/known/timestamppb" ) -func Test_StatsDParser_Parse(t *testing.T) { - timeNowFunc = func() int64 { - return 0 - } +func Test_ParseMessageToMetric(t *testing.T) { tests := []struct { name string input string - wantMetric *metricspb.Metric + wantMetric statsDMetric err error }{ { @@ -55,38 +53,30 @@ func Test_StatsDParser_Parse(t *testing.T) { err: errors.New("empty metric value"), }, { - name: "integer counter", - input: "test.metric:42|c", - wantMetric: testMetric("test.metric", - metricspb.MetricDescriptor_CUMULATIVE_INT64, - nil, - nil, - "", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_Int64Value{ - Int64Value: 42, - }, - }), + name: "invalid sample rate value", + input: "test.metric:42|c|@1.0a", + err: errors.New("parse sample rate: 1.0a"), }, { - name: "float counter", - input: "test.metric:42.0|c", - wantMetric: testMetric("test.metric", - metricspb.MetricDescriptor_CUMULATIVE_INT64, - nil, - nil, - "", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_Int64Value{ - Int64Value: 42, - }, - }), + name: "invalid tag format", + input: "test.metric:42|c|#key1", + err: errors.New("invalid tag format: [key1]"), + }, + { + name: "unrecognized message part", + input: "test.metric:42|c|$extra", + err: errors.New("unrecognized message part: $extra"), + }, + { + name: "integer counter", + input: "test.metric:42|c", + wantMetric: testStatsDMetric("test.metric", + "42", + 42, + 0, + false, + "c", + "", 1, 0, nil, nil), }, { name: "invalid counter metric value", @@ -99,10 +89,17 @@ func Test_StatsDParser_Parse(t *testing.T) { err: errors.New("unsupported metric type: unhandled_type"), }, { - name: "counter metric with sample rate and tags", + name: "counter metric with sample rate and tag", input: "test.metric:42|c|@0.1|#key:value", - wantMetric: testMetric("test.metric", - metricspb.MetricDescriptor_CUMULATIVE_INT64, + wantMetric: testStatsDMetric("test.metric", + "42", + 420, + 0, + false, + "c", + "", + 1, + 0.1, []*metricspb.LabelKey{ { Key: "key", @@ -113,22 +110,20 @@ func Test_StatsDParser_Parse(t *testing.T) { Value: "value", HasValue: true, }, - }, - "", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_Int64Value{ - Int64Value: 420, - }, }), }, { - name: "counter metric with sample rate(not divisible) and tags", + name: "counter metric with sample rate(not divisible) and tag", input: "test.metric:42|c|@0.8|#key:value", - wantMetric: testMetric("test.metric", - metricspb.MetricDescriptor_CUMULATIVE_INT64, + wantMetric: testStatsDMetric("test.metric", + "42", + 52, + 0, + false, + "c", + "", + 1, + 0.8, []*metricspb.LabelKey{ { Key: "key", @@ -139,48 +134,78 @@ func Test_StatsDParser_Parse(t *testing.T) { Value: "value", HasValue: true, }, - }, - "", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_Int64Value{ - Int64Value: 52, - }, }), }, { - name: "double gauge metric", - input: "test.gauge:42.0|g|@0.1|#key:value", - wantMetric: testMetric("test.gauge", - metricspb.MetricDescriptor_GAUGE_DOUBLE, + name: "counter metric with sample rate(not divisible) and two tags", + input: "test.metric:42|c|@0.8|#key:value,key2:value2", + wantMetric: testStatsDMetric("test.metric", + "42", + 52, + 0, + false, + "c", + "", + 1, + 0.8, []*metricspb.LabelKey{ { Key: "key", }, + { + Key: "key2", + }, }, []*metricspb.LabelValue{ { Value: "value", HasValue: true, }, - }, - "", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: 42, + { + Value: "value2", + HasValue: true, }, }), }, { - name: "int gauge metric", - input: "test.gauge:42|g|@0.1|#key:value", - wantMetric: testMetric("test.gauge", - metricspb.MetricDescriptor_GAUGE_DOUBLE, + name: "double gauge", + input: "test.metric:42.0|g", + wantMetric: testStatsDMetric("test.metric", + "42.0", + 0, + 42, + false, + "g", + "", 2, 0, nil, nil), + }, + { + name: "int gauge", + input: "test.metric:42|g", + wantMetric: testStatsDMetric("test.metric", + "42", + 0, + 42, + false, + "g", + "", 2, 0, nil, nil), + }, + { + name: "invalid gauge metric value", + input: "test.metric:42.abc|g", + err: errors.New("gauge: parse metric value string: 42.abc"), + }, + { + name: "gauge metric with sample rate and tag", + input: "test.metric:11|g|@0.1|#key:value", + wantMetric: testStatsDMetric("test.metric", + "11", + 0, + 11, + false, + "g", + "", + 2, + 0.1, []*metricspb.LabelKey{ { Key: "key", @@ -191,98 +216,649 @@ func Test_StatsDParser_Parse(t *testing.T) { Value: "value", HasValue: true, }, - }, - "", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: 42, - }, - }), - }, - { - name: "gauge: invalid metric value", - input: "test.metric:invalidValue|g", - err: errors.New("gauge: parse metric value string: invalidValue"), - }, - { - name: "timer metric with sample rate", - input: "test.timer:42.3|ms|@0.1", - wantMetric: testMetric("test.timer", - metricspb.MetricDescriptor_GAUGE_DOUBLE, - nil, - nil, - "ms", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: 42.3, - }, }), }, { - name: "timer metric with sample rate and tag", - input: "test.timer:42|ms|@0.1|#key:value", - wantMetric: testMetric("test.timer", - metricspb.MetricDescriptor_GAUGE_DOUBLE, + name: "gauge metric with sample rate and two tags", + input: "test.metric:11|g|@0.8|#key:value,key2:value2", + wantMetric: testStatsDMetric("test.metric", + "11", + 0, + 11, + false, + "g", + "", + 2, + 0.8, []*metricspb.LabelKey{ { Key: "key", }, + { + Key: "key2", + }, }, []*metricspb.LabelValue{ { Value: "value", HasValue: true, }, - }, - "ms", - &metricspb.Point{ - Timestamp: ×tamppb.Timestamp{ - Seconds: 0, - }, - Value: &metricspb.Point_DoubleValue{ - DoubleValue: 42, + { + Value: "value2", + HasValue: true, }, }), }, { - name: "timer: invalid metric value", - input: "test.metric:invalidValue|ms", - err: errors.New("timer: failed to parse metric value to float: invalidValue"), + name: "double gauge plus", + input: "test.metric:+42.0|g", + wantMetric: testStatsDMetric("test.metric", + "+42.0", + 0, + 42, + true, + "g", + "", 2, 0, nil, nil), }, { - name: "invalid sample rate value", - input: "test.metric:42|c|@1.0a", - err: errors.New("parse sample rate: 1.0a"), + name: "double gauge minus", + input: "test.metric:-42.0|g", + wantMetric: testStatsDMetric("test.metric", + "-42.0", + 0, + -42, + true, + "g", + "", 2, 0, nil, nil), }, { - name: "invalid tag format", - input: "test.metric:42|c|#key1", - err: errors.New("invalid tag format: [key1]"), + name: "int gauge plus", + input: "test.metric:+42|g", + wantMetric: testStatsDMetric("test.metric", + "+42", + 0, + 42, + true, + "g", + "", 2, 0, nil, nil), }, { - name: "unrecognized message part", - input: "test.metric:42|c|$extra", - err: errors.New("unrecognized message part: $extra"), + name: "int gauge minus", + input: "test.metric:-42|g", + wantMetric: testStatsDMetric("test.metric", + "-42", + 0, + -42, + true, + "g", + "", 2, 0, nil, nil), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := &StatsDParser{} - got, err := p.Parse(tt.input) + got, err := parseMessageToMetric(tt.input) if tt.err != nil { - assert.Equal(t, err, tt.err) + assert.Equal(t, tt.err, err) } else { assert.NoError(t, err) - assert.Equal(t, got, tt.wantMetric) + assert.Equal(t, tt.wantMetric, got) + } + }) + } +} + +func testStatsDMetric(name string, + value string, intValue int64, + floatValue float64, + addition bool, statsdMetricType string, + unit string, metricType metricspb.MetricDescriptor_Type, + sampleRate float64, labelKeys []*metricspb.LabelKey, + labelValue []*metricspb.LabelValue) statsDMetric { + if len(labelKeys) > 0 { + var kvs []label.KeyValue + var sortable label.Sortable + for n, k := range labelKeys { + if labelValue[n].HasValue { + kvs = append(kvs, label.String(k.Key, labelValue[n].Value)) + } + } + set := label.NewSetWithSortable(kvs, &sortable) + return statsDMetric{ + description: statsDMetricdescription{ + name: name, + statsdMetricType: statsdMetricType, + labels: set.Equivalent(), + }, + value: value, + intvalue: intValue, + floatvalue: floatValue, + addition: addition, + unit: unit, + metricType: metricType, + sampleRate: sampleRate, + labelKeys: labelKeys, + labelValues: labelValue, + } + } + return statsDMetric{ + description: statsDMetricdescription{ + name: name, + statsdMetricType: statsdMetricType, + }, + value: value, + intvalue: intValue, + floatvalue: floatValue, + addition: addition, + unit: unit, + metricType: metricType, + sampleRate: sampleRate, + labelKeys: labelKeys, + labelValues: labelValue, + } +} + +func testDescription(name string, statsdMetricType string, keys []string, values []string) statsDMetricdescription { + var kvs []label.KeyValue + var sortable label.Sortable + for n, k := range keys { + kvs = append(kvs, label.String(k, values[n])) + } + set := label.NewSetWithSortable(kvs, &sortable) + return statsDMetricdescription{ + name: name, + statsdMetricType: statsdMetricType, + labels: set.Equivalent(), + } +} + +func TestStatsDParser_Aggregate(t *testing.T) { + timeNowFunc = func() int64 { + return 0 + } + + tests := []struct { + name string + input []string + expectedGauges map[statsDMetricdescription]*metricspb.Metric + expectedCounters map[statsDMetricdescription]*metricspb.Metric + err error + }{ + { + name: "parsedMetric error: empty metric value", + input: []string{ + "test.metric:|c", + }, + err: errors.New("empty metric value"), + }, + { + name: "parsedMetric error: empty metric name", + input: []string{ + ":42|c", + }, + err: errors.New("empty metric name"), + }, + { + name: "gauge plus", + input: []string{ + "statsdTestMetric1:1|g|#mykey:myvalue", + "statsdTestMetric2:2|g|#mykey:myvalue", + "statsdTestMetric1:+1|g|#mykey:myvalue", + "statsdTestMetric1:+100|g|#mykey:myvalue", + "statsdTestMetric1:+10000|g|#mykey:myvalue", + "statsdTestMetric2:+5|g|#mykey:myvalue", + "statsdTestMetric2:+500|g|#mykey:myvalue", + }, + expectedGauges: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 10102, + }, + }), + testDescription("statsdTestMetric2", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric2", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 507, + }, + }), + }, + expectedCounters: map[statsDMetricdescription]*metricspb.Metric{}, + }, + { + name: "gauge minus", + input: []string{ + "statsdTestMetric1:5000|g|#mykey:myvalue", + "statsdTestMetric2:10|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + "statsdTestMetric2:-5|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + "statsdTestMetric1:-10|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + "statsdTestMetric1:-100|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + }, + expectedGauges: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 4885, + }, + }), + testDescription("statsdTestMetric2", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric2", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 5, + }, + }), + }, + expectedCounters: map[statsDMetricdescription]*metricspb.Metric{}, + }, + { + name: "gauge plus and minus", + input: []string{ + "statsdTestMetric1:5000|g|#mykey:myvalue", + "statsdTestMetric1:4000|g|#mykey:myvalue", + "statsdTestMetric1:+500|g|#mykey:myvalue", + "statsdTestMetric1:-400|g|#mykey:myvalue", + "statsdTestMetric1:+2|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + "statsdTestMetric2:365|g|#mykey:myvalue", + "statsdTestMetric2:+300|g|#mykey:myvalue", + "statsdTestMetric2:-200|g|#mykey:myvalue", + "statsdTestMetric2:200|g|#mykey:myvalue", + }, + expectedGauges: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 4101, + }, + }), + testDescription("statsdTestMetric2", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric2", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 200, + }, + }), + }, + expectedCounters: map[statsDMetricdescription]*metricspb.Metric{}, + }, + { + name: "counter with increment and sample rate", + input: []string{ + "statsdTestMetric1:3000|c|#mykey:myvalue", + "statsdTestMetric1:4000|c|#mykey:myvalue", + "statsdTestMetric2:20|c|@0.8|#mykey:myvalue", + "statsdTestMetric2:20|c|@0.8|#mykey:myvalue", + }, + expectedGauges: map[statsDMetricdescription]*metricspb.Metric{}, + expectedCounters: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "c", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_INT64, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 7000, + }, + }), + testDescription("statsdTestMetric2", "c", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric2", + metricspb.MetricDescriptor_GAUGE_INT64, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 50, + }, + }), + }, + }, + { + name: "counter and gauge: one gauge and two counters", + input: []string{ + "statsdTestMetric1:3000|c|#mykey:myvalue", + "statsdTestMetric1:500|g|#mykey:myvalue", + "statsdTestMetric1:400|g|#mykey:myvalue", + "statsdTestMetric1:+20|g|#mykey:myvalue", + "statsdTestMetric1:4000|c|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + "statsdTestMetric2:20|c|@0.8|#mykey:myvalue", + "statsdTestMetric1:+2|g|#mykey:myvalue", + "statsdTestMetric2:20|c|@0.8|#mykey:myvalue", + }, + expectedGauges: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 421, + }, + }), + }, + expectedCounters: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "c", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_INT64, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 7000, + }, + }), + testDescription("statsdTestMetric2", "c", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric2", + metricspb.MetricDescriptor_GAUGE_INT64, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 50, + }, + }), + }, + }, + { + name: "counter and gauge: 2 gauges and 2 counters", + input: []string{ + "statsdTestMetric1:500|g|#mykey:myvalue", + "statsdTestMetric1:400|g|#mykey:myvalue1", + "statsdTestMetric1:300|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue1", + "statsdTestMetric1:+20|g|#mykey:myvalue", + "statsdTestMetric1:-1|g|#mykey:myvalue", + "statsdTestMetric1:20|c|@0.1|#mykey:myvalue", + "statsdTestMetric2:50|c|#mykey:myvalue", + "statsdTestMetric1:15|c|#mykey:myvalue", + "statsdTestMetric2:5|c|@0.2|#mykey:myvalue", + }, + expectedGauges: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 319, + }, + }), + testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue1"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue1", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 399, + }, + }), + }, + expectedCounters: map[statsDMetricdescription]*metricspb.Metric{ + testDescription("statsdTestMetric1", "c", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric1", + metricspb.MetricDescriptor_GAUGE_INT64, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 215, + }, + }), + testDescription("statsdTestMetric2", "c", + []string{"mykey"}, []string{"myvalue"}): testMetric("statsdTestMetric2", + metricspb.MetricDescriptor_GAUGE_INT64, + []*metricspb.LabelKey{ + { + Key: "mykey", + }, + }, + []*metricspb.LabelValue{ + { + Value: "myvalue", + HasValue: true, + }, + }, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 75, + }, + }), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var err error + p := &StatsDParser{} + p.Initialize() + for _, line := range tt.input { + err = p.Aggregate(line) + } + if tt.err != nil { + assert.Equal(t, tt.err, err) + } else { + assert.Equal(t, tt.expectedGauges, p.gauges) + assert.Equal(t, tt.expectedCounters, p.counters) } }) } @@ -311,3 +887,109 @@ func testMetric(metricName string, }, } } + +func Test_contains(t *testing.T) { + tests := []struct { + name string + slice []string + element string + expected bool + }{ + { + name: "contain 1", + slice: []string{ + "m", + "g", + }, + element: "m", + expected: true, + }, + { + name: "contain 2", + slice: []string{ + "m", + "g", + }, + element: "g", + expected: true, + }, + { + name: "does not contain", + slice: []string{ + "m", + "g", + }, + element: "t", + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + answer := contains(tt.slice, tt.element) + assert.Equal(t, tt.expected, answer) + }) + } +} + +func TestStatsDParser_Initialize(t *testing.T) { + p := &StatsDParser{} + p.Initialize() + labels := label.Distinct{} + teststatsdDMetricdescription := statsDMetricdescription{ + name: "test", + statsdMetricType: "g", + labels: labels} + p.gauges[teststatsdDMetricdescription] = &metricspb.Metric{} + p.counters[teststatsdDMetricdescription] = &metricspb.Metric{} + assert.Equal(t, 1, len(p.gauges)) + assert.Equal(t, 1, len(p.counters)) +} + +func TestStatsDParser_GetMetrics(t *testing.T) { + p := &StatsDParser{} + p.Initialize() + p.gauges[testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue"})] = testMetric("testGauge1", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + nil, + nil, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 1, + }, + }) + p.gauges[testDescription("statsdTestMetric1", "g", + []string{"mykey2"}, []string{"myvalue2"})] = testMetric("testGauge2", + metricspb.MetricDescriptor_GAUGE_DOUBLE, + nil, + nil, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_DoubleValue{ + DoubleValue: 2, + }, + }) + p.counters[testDescription("statsdTestMetric1", "g", + []string{"mykey"}, []string{"myvalue"})] = testMetric("testCounter1", + metricspb.MetricDescriptor_GAUGE_INT64, + nil, + nil, + "", + &metricspb.Point{ + Timestamp: ×tamppb.Timestamp{ + Seconds: 0, + }, + Value: &metricspb.Point_Int64Value{ + Int64Value: 1, + }, + }) + metrics := p.GetMetrics() + assert.Equal(t, 3, len(metrics)) +} diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 959da429e3428..bec28525a1d2e 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -19,10 +19,14 @@ import ( "fmt" "strings" "sync" + "time" + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/translator/internaldata" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol" @@ -41,6 +45,7 @@ type statsdReceiver struct { reporter transport.Reporter parser protocol.Parser nextConsumer consumer.MetricsConsumer + cancel context.CancelFunc startOnce sync.Once stopOnce sync.Once @@ -87,19 +92,38 @@ func buildTransportServer(config Config) (transport.Server, error) { } // StartMetricsReception starts a UDP server that can process StatsD messages. -func (r *statsdReceiver) Start(_ context.Context, host component.Host) error { +func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { r.Lock() defer r.Unlock() - err := componenterror.ErrAlreadyStarted r.startOnce.Do(func() { + ctx, r.cancel = context.WithCancel(ctx) + var transferChan = make(chan string, 10) + ticker := time.NewTicker(r.config.AggregationInterval) err = nil + r.parser.Initialize() go func() { - err = r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter) + err = r.server.ListenAndServe(r.parser, r.nextConsumer, r.reporter, transferChan) if err != nil { host.ReportFatalError(err) } }() + go func() { + for { + select { + case <-ticker.C: + metrics := r.parser.GetMetrics() + if len(metrics) > 0 { + r.Flush(ctx, metrics, r.nextConsumer) + } + case rawMetric := <-transferChan: + r.parser.Aggregate(rawMetric) + case <-ctx.Done(): + ticker.Stop() + return + } + } + }() }) return err @@ -113,6 +137,19 @@ func (r *statsdReceiver) Shutdown(context.Context) error { var err = componenterror.ErrAlreadyStopped r.stopOnce.Do(func() { err = r.server.Close() + r.cancel() }) return err } + +func (r *statsdReceiver) Flush(ctx context.Context, metrics []*metricspb.Metric, nextConsumer consumer.MetricsConsumer) error { + md := consumerdata.MetricsData{ + Metrics: metrics, + } + error := nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(md)) + if error != nil { + return error + } + + return nil +} diff --git a/receiver/statsdreceiver/receiver_test.go b/receiver/statsdreceiver/receiver_test.go index 4897e76869cae..80828b1280347 100644 --- a/receiver/statsdreceiver/receiver_test.go +++ b/receiver/statsdreceiver/receiver_test.go @@ -18,14 +18,16 @@ import ( "context" "errors" "net" - "runtime" "strconv" "testing" + "time" + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -48,13 +50,6 @@ func Test_statsdreceiver_New(t *testing.T) { args args wantErr error }{ - { - name: "default_config", - args: args{ - config: *defaultConfig, - nextConsumer: consumertest.NewMetricsNop(), - }, - }, { name: "nil_nextConsumer", args: args{ @@ -62,15 +57,6 @@ func Test_statsdreceiver_New(t *testing.T) { }, wantErr: componenterror.ErrNilNextConsumer, }, - { - name: "empty endpoint", - args: args{ - config: Config{ - ReceiverSettings: defaultConfig.ReceiverSettings, - }, - nextConsumer: consumertest.NewMetricsNop(), - }, - }, { name: "unsupported transport", args: args{ @@ -88,18 +74,25 @@ func Test_statsdreceiver_New(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := New(zap.NewNop(), tt.args.config, tt.args.nextConsumer) + _, err := New(zap.NewNop(), tt.args.config, tt.args.nextConsumer) assert.Equal(t, tt.wantErr, err) - if err == nil { - require.NotNil(t, got) - assert.NoError(t, got.Shutdown(context.Background())) - } else { - assert.Nil(t, got) - } }) } } +func TestStatsdReceiver_Flush(t *testing.T) { + ctx := context.Background() + cfg := createDefaultConfig().(*Config) + nextConsumer := consumertest.NewMetricsNop() + rcv, err := New(zap.NewNop(), *cfg, nextConsumer) + assert.NoError(t, err) + r := rcv.(*statsdReceiver) + var metrics = []*metricspb.Metric{} + assert.Nil(t, r.Flush(ctx, metrics, nextConsumer)) + r.Start(ctx, componenttest.NewNopHost()) + r.Shutdown(ctx) +} + func Test_statsdreceiver_EndToEnd(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) host, portStr, err := net.SplitHostPort(addr) @@ -113,9 +106,19 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { clientFn func(t *testing.T) *client.StatsD }{ { - name: "default_config", + name: "default_config with 9s interval", configFn: func() *Config { - return createDefaultConfig().(*Config) + return &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: configmodels.Type(typeStr), + NameVal: typeStr, + }, + NetAddr: confignet.NetAddr{ + Endpoint: defaultBindEndpoint, + Transport: defaultTransport, + }, + AggregationInterval: 9 * time.Second, + } }, clientFn: func(t *testing.T) *client.StatsD { c, err := client.NewStatsD(client.UDP, host, port) @@ -137,8 +140,6 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { r.reporter = mr require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) - runtime.Gosched() - defer r.Shutdown(context.Background()) require.Equal(t, componenterror.ErrAlreadyStarted, r.Start(context.Background(), componenttest.NewNopHost())) statsdClient := tt.clientFn(t) @@ -151,8 +152,7 @@ func Test_statsdreceiver_EndToEnd(t *testing.T) { err = statsdClient.SendMetric(statsdMetric) require.NoError(t, err) - mr.WaitAllOnMetricsProcessedCalls() - + time.Sleep(10 * time.Second) mdd := sink.AllMetrics() require.Len(t, mdd, 1) ocmd := internaldata.MetricsToOC(mdd[0]) diff --git a/receiver/statsdreceiver/testdata/config.yaml b/receiver/statsdreceiver/testdata/config.yaml index 6def063da98a0..52337190b2d6f 100644 --- a/receiver/statsdreceiver/testdata/config.yaml +++ b/receiver/statsdreceiver/testdata/config.yaml @@ -3,6 +3,7 @@ receivers: statsd/receiver_settings: endpoint: "localhost:12345" transport: "custom_transport" + aggregation_interval: 70s processors: exampleprocessor: diff --git a/receiver/statsdreceiver/transport/server.go b/receiver/statsdreceiver/transport/server.go index d6f0f0b6f70e3..f616a3b86c1b6 100644 --- a/receiver/statsdreceiver/transport/server.go +++ b/receiver/statsdreceiver/transport/server.go @@ -37,6 +37,7 @@ type Server interface { p protocol.Parser, mc consumer.MetricsConsumer, r Reporter, + transferChan chan<- string, ) error // Close stops any running ListenAndServe, however, it waits for any diff --git a/receiver/statsdreceiver/transport/server_test.go b/receiver/statsdreceiver/transport/server_test.go index 8512bf45c187a..bc14980a6d0f3 100644 --- a/receiver/statsdreceiver/transport/server_test.go +++ b/receiver/statsdreceiver/transport/server_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/testutil" - "go.opentelemetry.io/collector/translator/internaldata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/transport/client" @@ -63,12 +62,13 @@ func Test_Server_ListenAndServe(t *testing.T) { p := &protocol.StatsDParser{} require.NoError(t, err) mr := NewMockReporter(1) + var transferChan = make(chan string, 10) wgListenAndServe := sync.WaitGroup{} wgListenAndServe.Add(1) go func() { defer wgListenAndServe.Done() - assert.Error(t, srv.ListenAndServe(p, mc, mr)) + assert.Error(t, srv.ListenAndServe(p, mc, mr, transferChan)) }() runtime.Gosched() @@ -88,23 +88,11 @@ func Test_Server_ListenAndServe(t *testing.T) { err = gc.Disconnect() assert.NoError(t, err) - mr.WaitAllOnMetricsProcessedCalls() - err = srv.Close() assert.NoError(t, err) wgListenAndServe.Wait() - - mdd := mc.AllMetrics() - require.Len(t, mdd, 1) - ocmd := internaldata.MetricsToOC(mdd[0]) - require.Len(t, ocmd, 1) - require.Len(t, ocmd[0].Metrics, 1) - metric := ocmd[0].Metrics[0] - assert.Equal(t, "test.metric", metric.GetMetricDescriptor().GetName()) - - // require.Equal(t, 1, len(mc.md)) - // assert.Equal(t, "test.metric", mc.md[0].Metrics[0].GetMetricDescriptor().GetName()) + assert.Equal(t, 1, len(transferChan)) }) } } diff --git a/receiver/statsdreceiver/transport/udp_server.go b/receiver/statsdreceiver/transport/udp_server.go index 2dc881a53c821..46c176d1dbb3d 100644 --- a/receiver/statsdreceiver/transport/udp_server.go +++ b/receiver/statsdreceiver/transport/udp_server.go @@ -16,15 +16,11 @@ package transport import ( "bytes" - "context" "io" "net" "strings" - metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/translator/internaldata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol" ) @@ -53,6 +49,7 @@ func (u *udpServer) ListenAndServe( parser protocol.Parser, nextConsumer consumer.MetricsConsumer, reporter Reporter, + transferChan chan<- string, ) error { if parser == nil || nextConsumer == nil || reporter == nil { return errNilListenAndServeParameters @@ -66,7 +63,7 @@ func (u *udpServer) ListenAndServe( if n > 0 { bufCopy := make([]byte, n) copy(bufCopy, buf) - u.handlePacket(parser, nextConsumer, bufCopy) + u.handlePacket(bufCopy, transferChan) } if err != nil { u.reporter.OnDebugf("UDP Transport (%s) - ReadFrom error: %v", @@ -87,13 +84,9 @@ func (u *udpServer) Close() error { } func (u *udpServer) handlePacket( - p protocol.Parser, - nextConsumer consumer.MetricsConsumer, data []byte, + transferChan chan<- string, ) { - ctx := u.reporter.OnDataReceived(context.Background()) - var numReceivedMessages int - var metrics []*metricspb.Metric buf := bytes.NewBuffer(data) for { bytes, err := buf.ReadBytes((byte)('\n')) @@ -105,20 +98,7 @@ func (u *udpServer) handlePacket( } line := strings.TrimSpace(string(bytes)) if line != "" { - numReceivedMessages++ - metric, err := p.Parse(line) - if err != nil { - u.reporter.OnTranslationError(ctx, err) - continue - } - - metrics = append(metrics, metric) + transferChan <- line } } - - md := consumerdata.MetricsData{ - Metrics: metrics, - } - err := nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(md)) - u.reporter.OnMetricsProcessed(ctx, numReceivedMessages, err) }