diff --git a/comp/dogstatsd/server/batch.go b/comp/dogstatsd/server/batch.go index 763cc304a684b..ecf290cb5b005 100644 --- a/comp/dogstatsd/server/batch.go +++ b/comp/dogstatsd/server/batch.go @@ -206,7 +206,6 @@ func (b *batcher) flushSamplesWithTs() { tlmChannel.Observe(float64(t2.Sub(t1).Nanoseconds()), "late_metrics") b.samplesWithTsCount = 0 - b.metricSamplePool.PutBatch(b.samplesWithTs) b.samplesWithTs = b.metricSamplePool.GetBatch() } } diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index d68dc92bb86b4..89b7bcef8bebb 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -211,6 +211,7 @@ func initAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forward noAggSerializer = serializer.NewSerializer(sharedForwarder, orchestratorForwarder) noAggWorker = newNoAggregationStreamWorker( config.Datadog.GetInt("dogstatsd_no_aggregation_pipeline_batch_size"), + metricSamplePool, noAggSerializer, agg.flushAndSerializeInParallel, ) diff --git a/pkg/aggregator/no_aggregation_stream_worker.go b/pkg/aggregator/no_aggregation_stream_worker.go index ac84264bc0efa..b2f3a15c9c5d2 100644 --- a/pkg/aggregator/no_aggregation_stream_worker.go +++ b/pkg/aggregator/no_aggregation_stream_worker.go @@ -36,6 +36,9 @@ type noAggregationStreamWorker struct { flushConfig FlushAndSerializeInParallel maxMetricsPerPayload int + // pointer to the shared MetricSamplePool stored in the Demultiplexer. + metricSamplePool *metrics.MetricSamplePool + seriesSink *metrics.IterableSeries sketchesSink *metrics.IterableSketches @@ -73,7 +76,8 @@ func init() { noaggExpvars.Set("Flush", &expvarNoAggFlush) } -func newNoAggregationStreamWorker(maxMetricsPerPayload int, serializer serializer.MetricSerializer, flushConfig FlushAndSerializeInParallel) *noAggregationStreamWorker { +func newNoAggregationStreamWorker(maxMetricsPerPayload int, metricSamplePool *metrics.MetricSamplePool, + serializer serializer.MetricSerializer, flushConfig FlushAndSerializeInParallel) *noAggregationStreamWorker { return &noAggregationStreamWorker{ serializer: serializer, flushConfig: flushConfig, @@ -216,6 +220,8 @@ func (w *noAggregationStreamWorker) run() { tlmNoAggSamplesProcessedUnsupportedType.Add(float64(countUnsupportedType)) expvarNoAggSamplesProcessedUnsupportedType.Add(int64(countUnsupportedType)) + w.metricSamplePool.PutBatch(samples) // return the sample batch back to the pool for reuse + if serializedSamples > w.maxMetricsPerPayload { tlmNoAggFlush.Add(1) break mainloop // end `Serialize` call and trigger a flush to the forwarder diff --git a/releasenotes/notes/noagg-fix-pool-usage-9464ebcd9d33da37.yaml b/releasenotes/notes/noagg-fix-pool-usage-9464ebcd9d33da37.yaml new file mode 100644 index 0000000000000..b475c2a855573 --- /dev/null +++ b/releasenotes/notes/noagg-fix-pool-usage-9464ebcd9d33da37.yaml @@ -0,0 +1,13 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +fixes: + - | + Fixes a bug where the metric with timestamps pipeline could have wrongly + processed metrics without timestamps (when both pipelines were flooded), + potentially leading to inaccuracies.