Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/trace/agent: simplify channels #3868

Merged
merged 1 commit into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 17 additions & 31 deletions pkg/trace/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type Agent struct {
// tags based on their type.
obfuscator *obfuscate.Obfuscator

spansOut chan *writer.SampledSpans
In chan *api.Trace
Out chan *writer.SampledSpans

// config
conf *config.AgentConfig
Expand All @@ -54,39 +55,24 @@ type Agent struct {
// which may be cancelled in order to gracefully stop the agent.
func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
dynConf := sampler.NewDynamicConfig(conf.DefaultEnv)

rawTraceChan := make(chan *api.Trace, 5000)
spansOut := make(chan *writer.SampledSpans, 1000)
in := make(chan *api.Trace, 5000)
out := make(chan *writer.SampledSpans, 1000)
statsChan := make(chan []stats.Bucket)

r := api.NewHTTPReceiver(conf, dynConf, rawTraceChan)
c := stats.NewConcentrator(
conf.ExtraAggregators,
conf.BucketInterval.Nanoseconds(),
statsChan,
)

obf := obfuscate.NewObfuscator(conf.Obfuscation)
ss := NewScoreSampler(conf)
ess := NewErrorsSampler(conf)
ps := NewPrioritySampler(conf, dynConf)
ep := eventProcessorFromConf(conf)
tw := writer.NewTraceWriter(conf, spansOut)
sw := writer.NewStatsWriter(conf, statsChan)

return &Agent{
Receiver: r,
Concentrator: c,
Receiver: api.NewHTTPReceiver(conf, dynConf, in),
Concentrator: stats.NewConcentrator(conf.ExtraAggregators, conf.BucketInterval.Nanoseconds(), statsChan),
Blacklister: filters.NewBlacklister(conf.Ignore["resource"]),
Replacer: filters.NewReplacer(conf.ReplaceTags),
ScoreSampler: ss,
ErrorsScoreSampler: ess,
PrioritySampler: ps,
EventProcessor: ep,
TraceWriter: tw,
StatsWriter: sw,
obfuscator: obf,
spansOut: spansOut,
ScoreSampler: NewScoreSampler(conf),
ErrorsScoreSampler: NewErrorsSampler(conf),
PrioritySampler: NewPrioritySampler(conf, dynConf),
EventProcessor: eventProcessorFromConf(conf),
TraceWriter: writer.NewTraceWriter(conf, out),
StatsWriter: writer.NewStatsWriter(conf, statsChan),
obfuscator: obfuscate.NewObfuscator(conf.Obfuscation),
In: in,
Out: out,
conf: conf,
dynConf: dynConf,
ctx: ctx,
Expand Down Expand Up @@ -119,7 +105,7 @@ func (a *Agent) Run() {
func (a *Agent) work() {
for {
select {
case t, ok := <-a.Receiver.Out:
case t, ok := <-a.In:
if !ok {
return
}
Expand Down Expand Up @@ -259,7 +245,7 @@ func (a *Agent) sample(ts *info.TagStats, pt ProcessedTrace) {
atomic.AddInt64(&ts.EventsSampled, int64(len(events)))

if !ss.Empty() {
a.spansOut <- &ss
a.Out <- &ss
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/trace/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func benchThroughput(file string) func(*testing.B) {
// start the agent without the trace and stats writers; we will be draining
// these channels ourselves in the benchmarks, plus we don't want the writers
// resource usage to show up in the results.
agnt.spansOut = make(chan *writer.SampledSpans)
agnt.Out = make(chan *writer.SampledSpans)
go agnt.Run()

// wait for receiver to start:
Expand Down Expand Up @@ -614,7 +614,7 @@ func benchThroughput(file string) func(*testing.B) {
loop:
for {
select {
case <-agnt.spansOut:
case <-agnt.Out:
got++
if got == count {
// processed everything!
Expand Down
10 changes: 5 additions & 5 deletions pkg/trace/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ const (
type HTTPReceiver struct {
Stats *info.ReceiverStats
RateLimiter *rateLimiter
Out chan *Trace

out chan *Trace
conf *config.AgentConfig
dynConf *sampler.DynamicConfig
server *http.Server
Expand All @@ -91,7 +91,7 @@ func NewHTTPReceiver(conf *config.AgentConfig, dynConf *sampler.DynamicConfig, o
return &HTTPReceiver{
Stats: info.NewReceiverStats(),
RateLimiter: newRateLimiter(),
Out: out,
out: out,

conf: conf,
dynConf: dynConf,
Expand Down Expand Up @@ -246,7 +246,7 @@ func (r *HTTPReceiver) Stop() error {
return err
}
r.wg.Wait()
close(r.Out)
close(r.out)
return nil
}

Expand Down Expand Up @@ -403,7 +403,7 @@ func (r *HTTPReceiver) processTraces(ts *info.TagStats, traces pb.Traces) {
continue
}

r.Out <- &Trace{
r.out <- &Trace{
Source: &ts.Tags,
Spans: trace,
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (r *HTTPReceiver) loop() {
r.watchdog(now)
case now := <-t.C:
metrics.Gauge("datadog.trace_agent.heartbeat", 1, nil, 1)
metrics.Gauge("datadog.trace_agent.receiver.out_chan_fill", float64(len(r.Out))/float64(cap(r.Out)), nil, 1)
metrics.Gauge("datadog.trace_agent.receiver.out_chan_fill", float64(len(r.out))/float64(cap(r.out)), nil, 1)

// We update accStats with the new stats we collected
accStats.Acc(r.Stats)
Expand Down
18 changes: 9 additions & 9 deletions pkg/trace/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestLegacyReceiver(t *testing.T) {

// now we should be able to read the trace data
select {
case rt := <-tc.r.Out:
case rt := <-tc.r.out:
assert.Len(rt.Spans, 1)
span := rt.Spans[0]
assert.Equal(uint64(42), span.TraceID)
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestReceiverJSONDecoder(t *testing.T) {

// now we should be able to read the trace data
select {
case rt := <-tc.r.Out:
case rt := <-tc.r.out:
assert.Len(rt.Spans, 1)
span := rt.Spans[0]
assert.Equal(uint64(42), span.TraceID)
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestReceiverMsgpackDecoder(t *testing.T) {

// now we should be able to read the trace data
select {
case rt := <-tc.r.Out:
case rt := <-tc.r.out:
assert.Len(rt.Spans, 1)
span := rt.Spans[0]
assert.Equal(uint64(42), span.TraceID)
Expand All @@ -296,7 +296,7 @@ func TestReceiverMsgpackDecoder(t *testing.T) {

// now we should be able to read the trace data
select {
case rt := <-tc.r.Out:
case rt := <-tc.r.out:
assert.Len(rt.Spans, 1)
span := rt.Spans[0]
assert.Equal(uint64(42), span.TraceID)
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestHandleTraces(t *testing.T) {
for n := 0; n < 10; n++ {
// consume the traces channel without doing anything
select {
case <-receiver.Out:
case <-receiver.out:
default:
}

Expand Down Expand Up @@ -497,7 +497,7 @@ func BenchmarkHandleTracesFromOneApp(b *testing.B) {
b.StopTimer()
// consume the traces channel without doing anything
select {
case <-receiver.Out:
case <-receiver.out:
default:
}

Expand Down Expand Up @@ -537,7 +537,7 @@ func BenchmarkHandleTracesFromMultipleApps(b *testing.B) {
b.StopTimer()
// consume the traces channel without doing anything
select {
case <-receiver.Out:
case <-receiver.out:
default:
}

Expand Down Expand Up @@ -652,7 +652,7 @@ func TestWatchdog(t *testing.T) {
defer r.Stop()
go func() {
for {
<-r.Out
<-r.out
}
}()

Expand Down Expand Up @@ -734,7 +734,7 @@ func TestOOMKill(t *testing.T) {
defer r.Stop()
go func() {
for {
<-r.Out
<-r.out
}
}()

Expand Down