Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[demultiplexer] change the API to something clearer about DSD samples and timed samples #13247

Merged
merged 3 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestDistributionsTooManyTags(t *testing.T) {
Host: "",
Timestamp: timeNowNano() - 10000000,
}
demux.AddTimeSample(samp)
demux.AggregateSample(samp)

time.Sleep(1 * time.Second)

Expand Down Expand Up @@ -583,7 +583,7 @@ func flushSomeSamples(demux *AgentDemultiplexer) map[string]*metrics.Serie {
for i := 0; i < sampleCount; i++ {
name := fmt.Sprintf("serie%d", i)

demux.AddTimeSample(metrics.MetricSample{Name: name, Value: value, Mtype: metrics.CountType, Timestamp: timestamp})
demux.AggregateSample(metrics.MetricSample{Name: name, Value: value, Mtype: metrics.CountType, Timestamp: timestamp})

if _, found := expectedSeries[name]; !found {
expectedSeries[name] = &metrics.Serie{
Expand All @@ -597,7 +597,7 @@ func flushSomeSamples(demux *AgentDemultiplexer) map[string]*metrics.Serie {
}
}

// we have to wait here because AddTimeSample is async and we want to be
// we have to wait here because AggregateSample is async and we want to be
// sure all samples have been processed by the sampler
time.Sleep(1 * time.Second)

Expand Down
16 changes: 8 additions & 8 deletions pkg/aggregator/demultiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,21 @@ type Demultiplexer interface {
// Serializer returns the serializer used by the Demultiplexer instance.
Serializer() serializer.MetricSerializer

// Aggregation API
// Samples API
// --

// AddTimeSample sends a MetricSample to the time sampler.
// AggregateSample sends a MetricSample to the DogStatsD time sampler.
// In sharded implementation, the metric is sent to the first time sampler.
AddTimeSample(sample metrics.MetricSample)
// AddTimeSampleBatch sends a batch of MetricSample to the given time
// sampler shard.
AggregateSample(sample metrics.MetricSample)
// AggregateSamples sends a batch of MetricSample to the given DogStatsD
// time sampler shard.
// Implementation not supporting sharding may ignore the `shard` parameter.
AddTimeSampleBatch(shard TimeSamplerID, samples metrics.MetricSampleBatch)
AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch)

// AddLateMetrics pushes metrics in the no-aggregation pipeline: a pipeline
// SendSamplesWithoutAggregation pushes metrics in the no-aggregation pipeline: a pipeline
// where the metrics are not sampled and sent as-is.
// This is the method to use to send metrics with a valid timestamp attached.
AddLateMetrics(metrics metrics.MetricSampleBatch)
SendSamplesWithoutAggregation(metrics metrics.MetricSampleBatch)

// ForceFlushToSerializer flushes all the aggregated data from the different samplers to
// the serialization/forwarding parts.
Expand Down
28 changes: 14 additions & 14 deletions pkg/aggregator/demultiplexer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
type DemultiplexerWithAggregator interface {
Demultiplexer
Aggregator() *BufferedAggregator
// AddCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
AddCheckSample(sample metrics.MetricSample)
// AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
AggregateCheckSample(sample metrics.MetricSample)
Options() AgentDemultiplexerOptions
}

Expand Down Expand Up @@ -254,11 +254,11 @@ func (d *AgentDemultiplexer) Options() AgentDemultiplexerOptions {
return d.options
}

// AddAgentStartupTelemetry adds a startup event and count (in a time sampler)
// AddAgentStartupTelemetry adds a startup event and count (in a DSD time sampler)
// to be sent on the next flush.
func (d *AgentDemultiplexer) AddAgentStartupTelemetry(agentVersion string) {
if agentVersion != "" {
d.AddTimeSample(metrics.MetricSample{
d.AggregateSample(metrics.MetricSample{
Name: fmt.Sprintf("datadog.%s.started", d.aggregator.agentName),
Value: 1,
Tags: d.aggregator.tags(true),
Expand Down Expand Up @@ -528,24 +528,24 @@ func (d *AgentDemultiplexer) GetEventsAndServiceChecksChannels() (chan []*metric
return d.aggregator.GetBufferedChannels()
}

// AddLateMetrics buffers a bunch of late metrics. This data will be directly
// SendSamplesWithoutAggregation buffers a bunch of metrics with timestamp. This data will be directly
// transmitted "as-is" (i.e. no aggregation, no sampling) to the serializer.
func (d *AgentDemultiplexer) AddLateMetrics(samples metrics.MetricSampleBatch) {
func (d *AgentDemultiplexer) SendSamplesWithoutAggregation(samples metrics.MetricSampleBatch) {
// safe-guard: if for some reasons we are receiving some metrics here despite
// having the no-aggregation pipeline disabled, they are redirected to the first
// time sampler.
if !d.options.EnableNoAggregationPipeline {
d.AddTimeSampleBatch(TimeSamplerID(0), samples)
d.AggregateSamples(TimeSamplerID(0), samples)
return
}

tlmProcessed.Add(float64(len(samples)), "late_metrics")
d.statsd.noAggStreamWorker.addSamples(samples)
}

// AddTimeSampleBatch adds a batch of MetricSample into the given time sampler shard.
// If you have to submit a single metric sample see `AddTimeSample`.
func (d *AgentDemultiplexer) AddTimeSampleBatch(shard TimeSamplerID, samples metrics.MetricSampleBatch) {
// AggregateSamples adds a batch of MetricSample into the given DogStatsD time sampler shard.
// If you have to submit a single metric sample see `AggregateSample`.
func (d *AgentDemultiplexer) AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch) {
// distribute the samples on the different statsd samplers using a channel
// (in the time sampler implementation) for latency reasons:
// its buffering + the fact that it is another goroutine processing the samples,
Expand All @@ -554,15 +554,15 @@ func (d *AgentDemultiplexer) AddTimeSampleBatch(shard TimeSamplerID, samples met
d.statsd.workers[shard].samplesChan <- samples
}

// AddTimeSample adds a MetricSample in the first time sampler.
func (d *AgentDemultiplexer) AddTimeSample(sample metrics.MetricSample) {
// AggregateSample adds a MetricSample in the first DogStatsD time sampler.
func (d *AgentDemultiplexer) AggregateSample(sample metrics.MetricSample) {
batch := d.GetMetricSamplePool().GetBatch()
batch[0] = sample
d.statsd.workers[0].samplesChan <- batch[:1]
}

// AddCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
func (d *AgentDemultiplexer) AddCheckSample(sample metrics.MetricSample) {
// AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
func (d *AgentDemultiplexer) AggregateCheckSample(sample metrics.MetricSample) {
panic("not implemented yet.")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/aggregator/demultiplexer_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestDemuxNoAggOptionDisabled(t *testing.T) {

batch := testDemuxSamples(t)

demux.AddLateMetrics(batch)
demux.SendSamplesWithoutAggregation(batch)
require.Len(demux.statsd.workers[0].samplesChan, 1)
read := <-demux.statsd.workers[0].samplesChan
require.Len(read, 3)
Expand All @@ -75,7 +75,7 @@ func TestDemuxNoAggOptionEnabled(t *testing.T) {

batch := testDemuxSamples(t)

demux.AddLateMetrics(batch)
demux.SendSamplesWithoutAggregation(batch)
time.Sleep(200 * time.Millisecond) // give some time for the automatic flush to trigger
demux.Stop(true)

Expand Down
42 changes: 21 additions & 21 deletions pkg/aggregator/demultiplexer_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ import (
// the samples that the TimeSamplers should have received.
type TestAgentDemultiplexer struct {
*AgentDemultiplexer
receivedSamples []metrics.MetricSample
lateMetrics []metrics.MetricSample
aggregatedSamples []metrics.MetricSample
noAggSamples []metrics.MetricSample
sync.Mutex
}

// AddTimeSampleBatch implements a noop timesampler, appending the samples in an internal slice.
func (a *TestAgentDemultiplexer) AddTimeSampleBatch(shard TimeSamplerID, samples metrics.MetricSampleBatch) {
// AggregateSamples implements a noop timesampler, appending the samples in an internal slice.
func (a *TestAgentDemultiplexer) AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch) {
a.Lock()
a.receivedSamples = append(a.receivedSamples, samples...)
a.aggregatedSamples = append(a.aggregatedSamples, samples...)
a.Unlock()
}

// AggregateSample implements a noop timesampler, appending the sample in an internal slice.
func (a *TestAgentDemultiplexer) AggregateSample(sample metrics.MetricSample) {
a.Lock()
a.aggregatedSamples = append(a.aggregatedSamples, sample)
a.Unlock()
}

Expand All @@ -36,30 +43,23 @@ func (a *TestAgentDemultiplexer) GetEventsAndServiceChecksChannels() (chan []*me
return a.aggregator.GetBufferedChannels()
}

// AddTimeSample implements a noop timesampler, appending the sample in an internal slice.
func (a *TestAgentDemultiplexer) AddTimeSample(sample metrics.MetricSample) {
a.Lock()
a.receivedSamples = append(a.receivedSamples, sample)
a.Unlock()
}

// AddLateMetrics implements a fake no aggregation pipeline ingestion part,
// SendSamplesWithoutAggregation implements a fake no aggregation pipeline ingestion part,
// there will be NO AUTOMATIC FLUSH as it could exist in the real implementation
// Use Reset() to clean the buffer.
func (a *TestAgentDemultiplexer) AddLateMetrics(metrics metrics.MetricSampleBatch) {
func (a *TestAgentDemultiplexer) SendSamplesWithoutAggregation(metrics metrics.MetricSampleBatch) {
a.Lock()
a.lateMetrics = append(a.lateMetrics, metrics...)
a.noAggSamples = append(a.noAggSamples, metrics...)
a.Unlock()
}

func (a *TestAgentDemultiplexer) samples() (ontime []metrics.MetricSample, timed []metrics.MetricSample) {
a.Lock()
ontime = make([]metrics.MetricSample, len(a.receivedSamples))
timed = make([]metrics.MetricSample, len(a.lateMetrics))
for i, s := range a.receivedSamples {
ontime = make([]metrics.MetricSample, len(a.aggregatedSamples))
timed = make([]metrics.MetricSample, len(a.noAggSamples))
for i, s := range a.aggregatedSamples {
ontime[i] = s
}
for i, s := range a.lateMetrics {
for i, s := range a.noAggSamples {
timed[i] = s
}
a.Unlock()
Expand Down Expand Up @@ -122,8 +122,8 @@ func (a *TestAgentDemultiplexer) WaitEventPlatformEvents(eventType string, minEv
// Reset resets the internal samples slice.
func (a *TestAgentDemultiplexer) Reset() {
a.Lock()
a.receivedSamples = a.receivedSamples[0:0]
a.lateMetrics = a.lateMetrics[0:0]
a.aggregatedSamples = a.aggregatedSamples[0:0]
a.noAggSamples = a.noAggSamples[0:0]
a.Unlock()
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/aggregator/demultiplexer_serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,27 @@ func (d *ServerlessDemultiplexer) ForceFlushToSerializer(start time.Time, waitFo
})
}

// AddTimeSample send a MetricSample to the TimeSampler.
func (d *ServerlessDemultiplexer) AddTimeSample(sample metrics.MetricSample) {
// AggregateSample send a MetricSample to the TimeSampler.
func (d *ServerlessDemultiplexer) AggregateSample(sample metrics.MetricSample) {
d.flushLock.Lock()
defer d.flushLock.Unlock()
batch := d.GetMetricSamplePool().GetBatch()
batch[0] = sample
d.statsdWorker.samplesChan <- batch[:1]
}

// AddTimeSampleBatch send a MetricSampleBatch to the TimeSampler.
// AggregateSamples send a MetricSampleBatch to the TimeSampler.
// The ServerlessDemultiplexer is not using sharding in its DogStatsD pipeline,
// the `shard` parameter is ignored.
// In the Serverless Agent, consider using `AddTimeSample` instead.
func (d *ServerlessDemultiplexer) AddTimeSampleBatch(shard TimeSamplerID, samples metrics.MetricSampleBatch) {
// In the Serverless Agent, consider using `AggregateSample` instead.
func (d *ServerlessDemultiplexer) AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch) {
d.flushLock.Lock()
defer d.flushLock.Unlock()
d.statsdWorker.samplesChan <- samples
}

// AddLateMetrics is not supported in the Serverless Agent implementation.
func (d *ServerlessDemultiplexer) AddLateMetrics(samples metrics.MetricSampleBatch) {
// SendSamplesWithoutAggregation is not supported in the Serverless Agent implementation.
func (d *ServerlessDemultiplexer) SendSamplesWithoutAggregation(samples metrics.MetricSampleBatch) {
panic("not implemented.")
}

Expand Down
59 changes: 30 additions & 29 deletions pkg/dogstatsd/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ type batcher struct {
// offset while writing into samples entries (i.e. samples currently stored per pipeline)
samplesCount []int

// MetricSampleBatch used for late metrics
// no multi-pipelines for late ones since we don't process them and we directly
// MetricSampleBatch used for metrics with timestamp
// no multi-pipelines use for metrics with timestamp since we don't process them and we directly
// send them to the serializer
lateSamples metrics.MetricSampleBatch
// offset while writing into the late sample slice (i.e. late samples currently stored)
lateSamplesCount int
samplesWithTs metrics.MetricSampleBatch
// offset while writing into the sample with timestampe slice (i.e. count of samples
// with timestamp currently stored)
samplesWithTsCount int

events []*metrics.Event
serviceChecks []*metrics.ServiceCheck
Expand Down Expand Up @@ -86,14 +87,14 @@ func newBatcher(demux aggregator.DemultiplexerWithAggregator) *batcher {
}

// prepare the late samples buffer
lateSamples := demux.GetMetricSamplePool().GetBatch()
lateSamplesCount := 0
samplesWithTs := demux.GetMetricSamplePool().GetBatch()
samplesWithTsCount := 0

return &batcher{
samples: samples,
samplesCount: samplesCount,
lateSamples: lateSamples,
lateSamplesCount: lateSamplesCount,
samplesWithTs: samplesWithTs,
samplesWithTsCount: samplesWithTsCount,
metricSamplePool: demux.GetMetricSamplePool(),
choutEvents: e,
choutServiceChecks: sc,
Expand All @@ -112,20 +113,20 @@ func newServerlessBatcher(demux aggregator.Demultiplexer) *batcher {
samples := make([]metrics.MetricSampleBatch, pipelineCount)
samplesCount := make([]int, pipelineCount)

lateSamples := demux.GetMetricSamplePool().GetBatch()
lateSamplesCount := 0
samplesWithTs := demux.GetMetricSamplePool().GetBatch()
samplesWithTsCount := 0

for i := range samples {
samples[i] = demux.GetMetricSamplePool().GetBatch()
samplesCount[i] = 0
}

return &batcher{
samples: samples,
samplesCount: samplesCount,
lateSamples: lateSamples,
lateSamplesCount: lateSamplesCount,
metricSamplePool: demux.GetMetricSamplePool(),
samples: samples,
samplesCount: samplesCount,
samplesWithTs: samplesWithTs,
samplesWithTsCount: samplesWithTsCount,
metricSamplePool: demux.GetMetricSamplePool(),

demux: demux,
pipelineCount: pipelineCount,
Expand Down Expand Up @@ -173,12 +174,12 @@ func (b *batcher) appendLateSample(sample metrics.MetricSample) {
return
}

if b.lateSamplesCount == len(b.lateSamples) {
b.flushLateSamples()
if b.samplesWithTsCount == len(b.samplesWithTs) {
b.flushSamplesWithTs()
}

b.lateSamples[b.lateSamplesCount] = sample
b.lateSamplesCount++
b.samplesWithTs[b.samplesWithTsCount] = sample
b.samplesWithTsCount++
}

// Flushing
Expand All @@ -187,7 +188,7 @@ func (b *batcher) appendLateSample(sample metrics.MetricSample) {
func (b *batcher) flushSamples(shard uint32) {
if b.samplesCount[shard] > 0 {
t1 := time.Now()
b.demux.AddTimeSampleBatch(aggregator.TimeSamplerID(shard), b.samples[shard][:b.samplesCount[shard]])
b.demux.AggregateSamples(aggregator.TimeSamplerID(shard), b.samples[shard][:b.samplesCount[shard]])
t2 := time.Now()
tlmChannel.Observe(float64(t2.Sub(t1).Nanoseconds()), "metrics")

Expand All @@ -196,16 +197,16 @@ func (b *batcher) flushSamples(shard uint32) {
}
}

func (b *batcher) flushLateSamples() {
if b.lateSamplesCount > 0 {
func (b *batcher) flushSamplesWithTs() {
if b.samplesWithTsCount > 0 {
t1 := time.Now()
b.demux.AddLateMetrics(b.lateSamples[:b.lateSamplesCount])
b.demux.SendSamplesWithoutAggregation(b.samplesWithTs[:b.samplesWithTsCount])
t2 := time.Now()
tlmChannel.Observe(float64(t2.Sub(t1).Nanoseconds()), "late_metrics")

b.lateSamplesCount = 0
b.metricSamplePool.PutBatch(b.lateSamples)
b.lateSamples = b.metricSamplePool.GetBatch()
b.samplesWithTsCount = 0
b.metricSamplePool.PutBatch(b.samplesWithTs)
b.samplesWithTs = b.metricSamplePool.GetBatch()
}
}

Expand All @@ -216,8 +217,8 @@ func (b *batcher) flush() {
b.flushSamples(uint32(i))
}

// flush all late samples to the serializer
b.flushLateSamples()
// flush all samples with timestamp to the serializer
b.flushSamplesWithTs()

// flush events
if len(b.events) > 0 {
Expand Down
Loading