Skip to content

Commit

Permalink
[dogstatsd] do not read timestamp when the no aggregation pipeline is…
Browse files Browse the repository at this point in the history
… off (#13192)

* [dogstatsd] do not read timestamp when the no aggregation pipeline is off.

* [dogstatsd] add corresponding unit tests.
  • Loading branch information
remeh committed Aug 22, 2022
1 parent b710e71 commit fa18168
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
8 changes: 8 additions & 0 deletions pkg/dogstatsd/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ type parser struct {
// client. Defaulting to false, this opt-in flag is used to avoid changing tags cardinality
// for existing installations.
dsdOriginEnabled bool

// readTimestamps is true if the parser has to read timestamps from messages.
readTimestamps bool
}

func newParser(float64List *float64ListPool) *parser {
stringInternerCacheSize := config.Datadog.GetInt("dogstatsd_string_interner_size")
readTimestamps := config.Datadog.GetBool("dogstatsd_no_aggregation_pipeline")

return &parser{
interner: newStringInterner(stringInternerCacheSize),
readTimestamps: readTimestamps,
float64List: float64List,
dsdOriginEnabled: config.Datadog.GetBool("dogstatsd_origin_detection_client"),
}
Expand Down Expand Up @@ -165,6 +170,9 @@ func (p *parser) parseMetricSample(message []byte) (dogstatsdMetricSample, error
}
// timestamp
case bytes.HasPrefix(optionalField, timestampFieldPrefix):
if !p.readTimestamps {
continue
}
ts, err := strconv.ParseInt(string(optionalField[len(timestampFieldPrefix):]), 10, 0)
if err != nil {
return dogstatsdMetricSample{}, fmt.Errorf("could not parse dogstatsd timestamp %q: %v", optionalField[len(timestampFieldPrefix):], err)
Expand Down
28 changes: 27 additions & 1 deletion pkg/dogstatsd/parse_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -328,12 +329,32 @@ func TestParseMetricError(t *testing.T) {
}

func TestParseGaugeWithTimestamp(t *testing.T) {
// with tags and timestamp
// no timestamp should be read when the no agg pipeline is off

sample, err := parseMetricSample([]byte("metric:1234|g|#onetag|T1657100430"))

assert.NoError(t, err)

assert.Equal(t, "metric", sample.name)
assert.InEpsilon(t, 1234.0, sample.value, epsilon)
require.Nil(t, sample.values)
assert.Equal(t, gaugeType, sample.metricType)
require.Equal(t, 1, len(sample.tags))
assert.Equal(t, "onetag", sample.tags[0])
assert.InEpsilon(t, 1.0, sample.sampleRate, epsilon)
assert.Zero(t, sample.ts)

// enable the no aggregation pipeline

config.Datadog.Set("dogstatsd_no_aggregation_pipeline", true)
defer config.Datadog.Set("dogstatsd_no_aggregation_pipeline", false)

// with tags and timestamp

sample, err = parseMetricSample([]byte("metric:1234|g|#onetag|T1657100430"))

assert.NoError(t, err)

assert.Equal(t, "metric", sample.name)
assert.InEpsilon(t, 1234.0, sample.value, epsilon)
require.Nil(t, sample.values)
Expand Down Expand Up @@ -442,6 +463,11 @@ func TestParseGaugeWithTimestamp(t *testing.T) {
}

func TestParseGaugeTimestampMalformed(t *testing.T) {
// enable the no aggregation pipeline

config.Datadog.Set("dogstatsd_no_aggregation_pipeline", true)
defer config.Datadog.Set("dogstatsd_no_aggregation_pipeline", false)

// bad value

_, err := parseMetricSample([]byte("metric:1234|g|#onetag|TABCD"))
Expand Down
4 changes: 2 additions & 2 deletions pkg/dogstatsd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func TestUDPReceive(t *testing.T) {
port, err := getAvailableUDPPort()
require.NoError(t, err)
config.Datadog.SetDefault("dogstatsd_port", port)
config.Datadog.SetDefault("dogstatsd_no_aggregation_pipeline", true)
config.Datadog.Set("dogstatsd_no_aggregation_pipeline", true)
defer func() {
config.Datadog.SetDefault("dogstatsd_no_aggregation_pipeline", false)
config.Datadog.Set("dogstatsd_no_aggregation_pipeline", false)
}()

demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(10 * time.Millisecond)
Expand Down

0 comments on commit fa18168

Please sign in to comment.