Skip to content

Commit

Permalink
[no-aggregation] return the MetricSampleBatch to the pool only once…
Browse files Browse the repository at this point in the history
… done with it (#17728)

* [no-aggregation] return the `MetricSampleBatch` to the pool only once done with it.

Returning the `MetricSampleBatch` earlier (in the `batcher.go`) was incorrect since
the `NoAggregationStreamWorker` goroutine might still be using it to process its content.

Returning it early was making the MetricSampleBatch available to any running
`TimeSamplerWorker`s, which has the time to write (by indices) its sample in
the MetricSampleBatch, eventually still processed (with access by index) by the
`NoAggregationStreaWorker`, resulting in misuse of the metric sample (or its discard
if it is a no-aggregation unsupported type such as Distribution and Histograms).
  • Loading branch information
remeh committed Jun 22, 2023
1 parent 4048b5d commit 19ecb94
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 2 deletions.
1 change: 0 additions & 1 deletion comp/dogstatsd/server/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func (b *batcher) flushSamplesWithTs() {
tlmChannel.Observe(float64(t2.Sub(t1).Nanoseconds()), "late_metrics")

b.samplesWithTsCount = 0
b.metricSamplePool.PutBatch(b.samplesWithTs)
b.samplesWithTs = b.metricSamplePool.GetBatch()
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/aggregator/demultiplexer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func initAgentDemultiplexer(log log.Component, sharedForwarder forwarder.Forward
noAggSerializer = serializer.NewSerializer(sharedForwarder, orchestratorForwarder)
noAggWorker = newNoAggregationStreamWorker(
config.Datadog.GetInt("dogstatsd_no_aggregation_pipeline_batch_size"),
metricSamplePool,
noAggSerializer,
agg.flushAndSerializeInParallel,
)
Expand Down
8 changes: 7 additions & 1 deletion pkg/aggregator/no_aggregation_stream_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type noAggregationStreamWorker struct {
flushConfig FlushAndSerializeInParallel
maxMetricsPerPayload int

// pointer to the shared MetricSamplePool stored in the Demultiplexer.
metricSamplePool *metrics.MetricSamplePool

seriesSink *metrics.IterableSeries
sketchesSink *metrics.IterableSketches

Expand Down Expand Up @@ -73,7 +76,8 @@ func init() {
noaggExpvars.Set("Flush", &expvarNoAggFlush)
}

func newNoAggregationStreamWorker(maxMetricsPerPayload int, serializer serializer.MetricSerializer, flushConfig FlushAndSerializeInParallel) *noAggregationStreamWorker {
func newNoAggregationStreamWorker(maxMetricsPerPayload int, metricSamplePool *metrics.MetricSamplePool,
serializer serializer.MetricSerializer, flushConfig FlushAndSerializeInParallel) *noAggregationStreamWorker {
return &noAggregationStreamWorker{
serializer: serializer,
flushConfig: flushConfig,
Expand Down Expand Up @@ -216,6 +220,8 @@ func (w *noAggregationStreamWorker) run() {
tlmNoAggSamplesProcessedUnsupportedType.Add(float64(countUnsupportedType))
expvarNoAggSamplesProcessedUnsupportedType.Add(int64(countUnsupportedType))

w.metricSamplePool.PutBatch(samples) // return the sample batch back to the pool for reuse

if serializedSamples > w.maxMetricsPerPayload {
tlmNoAggFlush.Add(1)
break mainloop // end `Serialize` call and trigger a flush to the forwarder
Expand Down
13 changes: 13 additions & 0 deletions releasenotes/notes/noagg-fix-pool-usage-9464ebcd9d33da37.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
fixes:
- |
Fixes a bug where the metric with timestamps pipeline could have wrongly
processed metrics without timestamps (when both pipelines were flooded),
potentially leading to inaccuracies.

0 comments on commit 19ecb94

Please sign in to comment.