Skip to content

Commit

Permalink
[statsdreceiver]add aggregation for StatsD receiver (open-telemetry#1670
Browse files Browse the repository at this point in the history
)

* add aggregation for StatsD receiver, support metrics types: gauge and counter

* revise readme for StatsD receiver

* Use Distinct in OTel-Go SDK as the description field for statsDMetric
  • Loading branch information
gavindoudou committed Dec 15, 2020
1 parent a35b1fa commit 763713b
Show file tree
Hide file tree
Showing 15 changed files with 1,088 additions and 310 deletions.
41 changes: 31 additions & 10 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# StatsD Receiver

StatsD receiver for ingesting StatsD messages into the OpenTelemetry Collector.
StatsD receiver for ingesting StatsD messages(https://github.com/statsd/statsd/blob/master/docs/metric_types.md) into the OpenTelemetry Collector.

Supported pipeline types: metrics

Expand All @@ -12,27 +12,46 @@ The following settings are required:

- `endpoint` (default = `localhost:8125`): Address and port to listen on.


The Following settings are optional:

- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)

Example:

```yaml
receivers:
statsd:
statsd/2:
endpoint: "localhost:8127"
aggregation_interval: 70s
```

The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).

## Aggregation

Currently the `statsdreceiver` is not providing any aggregation. There are
ideas such as the [Metrics Transform Processor
Proposal](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/332)
that intend to enable control over Metric aggregation in a processor.

An alternative will be to implement some simple aggregation in this receiver.

Aggregation is done in statsD receiver. The default aggregation interval is 60s. The receiver only aggregates the metrics with the same metric name, metric type, label keys and label values. After each aggregation interval, the receiver will send all metrics (after aggregation) in this aggregation interval to the following workflow.

It supports:

Gauge(transferred to double):
- statsdTestMetric1:500|g|#mykey:myvalue
statsdTestMetric1:400|g|#mykey:myvalue
(get the latest value: 400)
- statsdTestMetric1:500|g|#mykey:myvalue
statsdTestMetric1:+2|g|#mykey:myvalue
statsdTestMetric1:-1|g|#mykey:myvalue
(get the value after calculation: 501)

Counter(transferred to int):
- statsdTestMetric1:3000|c|#mykey:myvalue
statsdTestMetric1:4000|c|#mykey:myvalue
(get the value after incrementation: 7000)
- statsdTestMetric1:3000|c|#mykey:myvalue
statsdTestMetric1:20|c|@0.8|#mykey:myvalue
(get the value after incrementation with sample rate: 3000+20/0.8=3025)
## Metrics

General format is:
Expand All @@ -43,14 +62,15 @@ General format is:

`<name>:<value>|c|@<sample-rate>|#<tag1-key>:<tag1-value>`

it supports sample rate

### Gauge

`<name>:<value>|g|@<sample-rate>|#<tag1-key>:<tag1-value>`

### Timer

`<name>:<value>|ms|@<sample-rate>|#<tag1-key>:<tag1-value>`

TODO: add support for timer

## Testing

Expand All @@ -60,6 +80,7 @@ General format is:
receivers:
statsd:
endpoint: "localhost:8125" # default
aggregation_interval: 60s # default

exporters:
file:
Expand Down
3 changes: 3 additions & 0 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package statsdreceiver

import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
)
Expand All @@ -23,4 +25,5 @@ import (
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
NetAddr confignet.NetAddr `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
}
2 changes: 2 additions & 0 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package statsdreceiver
import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -54,5 +55,6 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "localhost:12345",
Transport: "custom_transport",
},
AggregationInterval: 70 * time.Second,
}, r1)
}
9 changes: 6 additions & 3 deletions receiver/statsdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package statsdreceiver

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand All @@ -26,9 +27,10 @@ import (

const (
// The value of "type" key in configuration.
typeStr = "statsd"
defaultBindEndpoint = "localhost:8125"
defaultTransport = "udp"
typeStr = "statsd"
defaultBindEndpoint = "localhost:8125"
defaultTransport = "udp"
defaultAggregationInterval = 60 * time.Second
)

// NewFactory creates a factory for the StatsD receiver.
Expand All @@ -50,6 +52,7 @@ func createDefaultConfig() configmodels.Receiver {
Endpoint: defaultBindEndpoint,
Transport: defaultTransport,
},
AggregationInterval: defaultAggregationInterval,
}
}

Expand Down
12 changes: 12 additions & 0 deletions receiver/statsdreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,15 @@ func TestCreateReceiver(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, tReceiver, "receiver creation failed")
}

func TestCreateMetricsReceiverWithNilConsumer(t *testing.T) {
receiver, err := createMetricsReceiver(
context.Background(),
component.ReceiverCreateParams{Logger: zap.NewNop()},
createDefaultConfig(),
nil,
)

assert.Error(t, err, "nil consumer")
assert.Nil(t, receiver)
}
1 change: 1 addition & 0 deletions receiver/statsdreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/stretchr/testify v1.6.1
go.opencensus.io v0.22.5
go.opentelemetry.io/collector v0.16.1-0.20201207152538-326931de8c32
go.opentelemetry.io/otel v0.13.0
go.uber.org/zap v1.16.0
google.golang.org/protobuf v1.25.0
)
4 changes: 3 additions & 1 deletion receiver/statsdreceiver/protocol/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ import (

// Parser is something that can map input StatsD strings to OTLP Metric representations.
type Parser interface {
Parse(in string) (*metricspb.Metric, error)
Initialize() error
GetMetrics() []*metricspb.Metric
Aggregate(line string) error
}
Loading

0 comments on commit 763713b

Please sign in to comment.