diff --git a/pkg/trace/agent/agent.go b/pkg/trace/agent/agent.go index 7bbd51334c709..b4ec33f89e463 100644 --- a/pkg/trace/agent/agent.go +++ b/pkg/trace/agent/agent.go @@ -40,7 +40,8 @@ type Agent struct { // tags based on their type. obfuscator *obfuscate.Obfuscator - spansOut chan *writer.SampledSpans + In chan *api.Trace + Out chan *writer.SampledSpans // config conf *config.AgentConfig @@ -54,39 +55,24 @@ type Agent struct { // which may be cancelled in order to gracefully stop the agent. func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent { dynConf := sampler.NewDynamicConfig(conf.DefaultEnv) - - rawTraceChan := make(chan *api.Trace, 5000) - spansOut := make(chan *writer.SampledSpans, 1000) + in := make(chan *api.Trace, 5000) + out := make(chan *writer.SampledSpans, 1000) statsChan := make(chan []stats.Bucket) - r := api.NewHTTPReceiver(conf, dynConf, rawTraceChan) - c := stats.NewConcentrator( - conf.ExtraAggregators, - conf.BucketInterval.Nanoseconds(), - statsChan, - ) - - obf := obfuscate.NewObfuscator(conf.Obfuscation) - ss := NewScoreSampler(conf) - ess := NewErrorsSampler(conf) - ps := NewPrioritySampler(conf, dynConf) - ep := eventProcessorFromConf(conf) - tw := writer.NewTraceWriter(conf, spansOut) - sw := writer.NewStatsWriter(conf, statsChan) - return &Agent{ - Receiver: r, - Concentrator: c, + Receiver: api.NewHTTPReceiver(conf, dynConf, in), + Concentrator: stats.NewConcentrator(conf.ExtraAggregators, conf.BucketInterval.Nanoseconds(), statsChan), Blacklister: filters.NewBlacklister(conf.Ignore["resource"]), Replacer: filters.NewReplacer(conf.ReplaceTags), - ScoreSampler: ss, - ErrorsScoreSampler: ess, - PrioritySampler: ps, - EventProcessor: ep, - TraceWriter: tw, - StatsWriter: sw, - obfuscator: obf, - spansOut: spansOut, + ScoreSampler: NewScoreSampler(conf), + ErrorsScoreSampler: NewErrorsSampler(conf), + PrioritySampler: NewPrioritySampler(conf, dynConf), + EventProcessor: eventProcessorFromConf(conf), + TraceWriter: writer.NewTraceWriter(conf, out), + StatsWriter: writer.NewStatsWriter(conf, statsChan), + obfuscator: obfuscate.NewObfuscator(conf.Obfuscation), + In: in, + Out: out, conf: conf, dynConf: dynConf, ctx: ctx, @@ -119,7 +105,7 @@ func (a *Agent) Run() { func (a *Agent) work() { for { select { - case t, ok := <-a.Receiver.Out: + case t, ok := <-a.In: if !ok { return } @@ -259,7 +245,7 @@ func (a *Agent) sample(ts *info.TagStats, pt ProcessedTrace) { atomic.AddInt64(&ts.EventsSampled, int64(len(events))) if !ss.Empty() { - a.spansOut <- &ss + a.Out <- &ss } } diff --git a/pkg/trace/agent/agent_test.go b/pkg/trace/agent/agent_test.go index b44eb4066611a..ff5f7905e57d6 100644 --- a/pkg/trace/agent/agent_test.go +++ b/pkg/trace/agent/agent_test.go @@ -561,7 +561,7 @@ func benchThroughput(file string) func(*testing.B) { // start the agent without the trace and stats writers; we will be draining // these channels ourselves in the benchmarks, plus we don't want the writers // resource usage to show up in the results. - agnt.spansOut = make(chan *writer.SampledSpans) + agnt.Out = make(chan *writer.SampledSpans) go agnt.Run() // wait for receiver to start: @@ -614,7 +614,7 @@ func benchThroughput(file string) func(*testing.B) { loop: for { select { - case <-agnt.spansOut: + case <-agnt.Out: got++ if got == count { // processed everything! diff --git a/pkg/trace/api/api.go b/pkg/trace/api/api.go index 1003acb2e0674..765fa2cc0ad9f 100644 --- a/pkg/trace/api/api.go +++ b/pkg/trace/api/api.go @@ -68,8 +68,8 @@ const ( type HTTPReceiver struct { Stats *info.ReceiverStats RateLimiter *rateLimiter - Out chan *Trace + out chan *Trace conf *config.AgentConfig dynConf *sampler.DynamicConfig server *http.Server @@ -91,7 +91,7 @@ func NewHTTPReceiver(conf *config.AgentConfig, dynConf *sampler.DynamicConfig, o return &HTTPReceiver{ Stats: info.NewReceiverStats(), RateLimiter: newRateLimiter(), - Out: out, + out: out, conf: conf, dynConf: dynConf, @@ -246,7 +246,7 @@ func (r *HTTPReceiver) Stop() error { return err } r.wg.Wait() - close(r.Out) + close(r.out) return nil } @@ -403,7 +403,7 @@ func (r *HTTPReceiver) processTraces(ts *info.TagStats, traces pb.Traces) { continue } - r.Out <- &Trace{ + r.out <- &Trace{ Source: &ts.Tags, Spans: trace, } @@ -438,7 +438,7 @@ func (r *HTTPReceiver) loop() { r.watchdog(now) case now := <-t.C: metrics.Gauge("datadog.trace_agent.heartbeat", 1, nil, 1) - metrics.Gauge("datadog.trace_agent.receiver.out_chan_fill", float64(len(r.Out))/float64(cap(r.Out)), nil, 1) + metrics.Gauge("datadog.trace_agent.receiver.out_chan_fill", float64(len(r.out))/float64(cap(r.out)), nil, 1) // We update accStats with the new stats we collected accStats.Acc(r.Stats) diff --git a/pkg/trace/api/api_test.go b/pkg/trace/api/api_test.go index faef46b55a68c..fe73342bad262 100644 --- a/pkg/trace/api/api_test.go +++ b/pkg/trace/api/api_test.go @@ -144,7 +144,7 @@ func TestLegacyReceiver(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.Out: + case rt := <-tc.r.out: assert.Len(rt.Spans, 1) span := rt.Spans[0] assert.Equal(uint64(42), span.TraceID) @@ -207,7 +207,7 @@ func TestReceiverJSONDecoder(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.Out: + case rt := <-tc.r.out: assert.Len(rt.Spans, 1) span := rt.Spans[0] assert.Equal(uint64(42), span.TraceID) @@ -274,7 +274,7 @@ func TestReceiverMsgpackDecoder(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.Out: + case rt := <-tc.r.out: assert.Len(rt.Spans, 1) span := rt.Spans[0] assert.Equal(uint64(42), span.TraceID) @@ -296,7 +296,7 @@ func TestReceiverMsgpackDecoder(t *testing.T) { // now we should be able to read the trace data select { - case rt := <-tc.r.Out: + case rt := <-tc.r.out: assert.Len(rt.Spans, 1) span := rt.Spans[0] assert.Equal(uint64(42), span.TraceID) @@ -375,7 +375,7 @@ func TestHandleTraces(t *testing.T) { for n := 0; n < 10; n++ { // consume the traces channel without doing anything select { - case <-receiver.Out: + case <-receiver.out: default: } @@ -497,7 +497,7 @@ func BenchmarkHandleTracesFromOneApp(b *testing.B) { b.StopTimer() // consume the traces channel without doing anything select { - case <-receiver.Out: + case <-receiver.out: default: } @@ -537,7 +537,7 @@ func BenchmarkHandleTracesFromMultipleApps(b *testing.B) { b.StopTimer() // consume the traces channel without doing anything select { - case <-receiver.Out: + case <-receiver.out: default: } @@ -652,7 +652,7 @@ func TestWatchdog(t *testing.T) { defer r.Stop() go func() { for { - <-r.Out + <-r.out } }() @@ -734,7 +734,7 @@ func TestOOMKill(t *testing.T) { defer r.Stop() go func() { for { - <-r.Out + <-r.out } }()