Skip to content

Commit

Permalink
custom-build: 6.11.2 + branch gbbr/uds
Browse files Browse the repository at this point in the history
This is an APM-only custom build of the agent, containing the latest
Datadog Agent release (6.11.2) and the APM development branch from #3556.

It was created by running:

  git checkout 6.11.2
  git checkout gbbr/uds -- pkg/trace

  # clean up
  rm pkg/trace/writer/payload* pkg/watchdog/net.go
  rm pkg/trace/watchdog/info_nix_test.go
  • Loading branch information
gbbr committed May 27, 2019
1 parent 48d2c88 commit c6b38fe
Show file tree
Hide file tree
Showing 53 changed files with 1,441 additions and 936 deletions.
85 changes: 34 additions & 51 deletions pkg/trace/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"runtime"
"sync/atomic"
"time"

Expand All @@ -10,9 +11,8 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/event"
"github.com/DataDog/datadog-agent/pkg/trace/filters"
"github.com/DataDog/datadog-agent/pkg/trace/info"
"github.com/DataDog/datadog-agent/pkg/trace/metrics"
"github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"
"github.com/DataDog/datadog-agent/pkg/trace/obfuscate"
"github.com/DataDog/datadog-agent/pkg/trace/osutil"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/sampler"
"github.com/DataDog/datadog-agent/pkg/trace/stats"
Expand Down Expand Up @@ -60,8 +60,8 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
dynConf := sampler.NewDynamicConfig(conf.DefaultEnv)

// inter-component channels
rawTraceChan := make(chan pb.Trace, 5000) // about 1000 traces/sec for 5 sec, TODO: move to *model.Trace
tracePkgChan := make(chan *writer.TracePackage)
rawTraceChan := make(chan pb.Trace, 5000)
tracePkgChan := make(chan *writer.TracePackage, 1000)
statsChan := make(chan []stats.Bucket)
serviceChan := make(chan pb.ServicesMetadata, 50)
filteredServiceChan := make(chan pb.ServicesMetadata, 50)
Expand Down Expand Up @@ -111,11 +111,6 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
func (a *Agent) Run() {
info.UpdatePreSampler(*a.Receiver.PreSampler.Stats()) // avoid exposing 0

a.start()
a.loop()
}

func (a *Agent) start() {
for _, starter := range []interface{ Start() }{
a.Receiver,
a.TraceWriter,
Expand All @@ -130,19 +125,36 @@ func (a *Agent) start() {
} {
starter.Start()
}

n := 1
if config.HasFeature("parallel_process") {
n = runtime.NumCPU()
}
for i := 0; i < n; i++ {
go a.work()
}

a.loop()
}

func (a *Agent) loop() {
ticker := time.NewTicker(a.conf.WatchdogInterval)
defer ticker.Stop()
func (a *Agent) work() {
for {
select {
case t := <-a.Receiver.Out:
case t, ok := <-a.Receiver.Out:
if !ok {
return
}
a.Process(t)
case <-ticker.C:
a.watchdog()
}
}

}

func (a *Agent) loop() {
for {
select {
case <-a.ctx.Done():
log.Info("exiting")
log.Info("Exiting...")
if err := a.Receiver.Stop(); err != nil {
log.Error(err)
}
Expand All @@ -164,10 +176,12 @@ func (a *Agent) loop() {
// passes it downstream.
func (a *Agent) Process(t pb.Trace) {
if len(t) == 0 {
log.Debugf("skipping received empty trace")
log.Debugf("Skipping received empty trace")
return
}

defer timing.Since("datadog.trace_agent.internal.process_trace_ms", time.Now())

// Root span is used to carry some trace-level metadata, such as sampling rate and priority.
root := traceutil.GetRoot(t)

Expand All @@ -194,7 +208,7 @@ func (a *Agent) Process(t pb.Trace) {
atomic.AddInt64(stat, 1)

if !a.Blacklister.Allows(root) {
log.Debugf("trace rejected by blacklister. root: %v", root)
log.Debugf("Trace rejected by blacklister. root: %v", root)
atomic.AddInt64(&ts.TracesFiltered, 1)
atomic.AddInt64(&ts.SpansFiltered, int64(len(t)))
return
Expand Down Expand Up @@ -247,6 +261,7 @@ func (a *Agent) Process(t pb.Trace) {

go func(pt ProcessedTrace) {
defer watchdog.LogOnPanic()
defer timing.Since("datadog.trace_agent.internal.concentrator_ms", time.Now())
// Everything is sent to concentrator for stats, regardless of sampling.
a.Concentrator.Add(&stats.Input{
Trace: pt.WeightedTrace,
Expand All @@ -262,6 +277,7 @@ func (a *Agent) Process(t pb.Trace) {
// Run both full trace sampling and transaction extraction in another goroutine.
go func(pt ProcessedTrace) {
defer watchdog.LogOnPanic()
defer timing.Since("datadog.trace_agent.internal.sample_ms", time.Now())

tracePkg := writer.TracePackage{}

Expand Down Expand Up @@ -303,39 +319,6 @@ func (a *Agent) sample(pt ProcessedTrace) (sampled bool, rate float64) {
return sampledScore || sampledPriority, sampler.CombineRates(ratePriority, rateScore)
}

// dieFunc is used by watchdog to kill the agent; replaced in tests.
var dieFunc = func(fmt string, args ...interface{}) {
osutil.Exitf(fmt, args...)
}

func (a *Agent) watchdog() {
var wi watchdog.Info
wi.CPU = watchdog.CPU()
wi.Mem = watchdog.Mem()
wi.Net = watchdog.Net()

if float64(wi.Mem.Alloc) > a.conf.MaxMemory && a.conf.MaxMemory > 0 {
dieFunc("exceeded max memory (current=%d, max=%d)", wi.Mem.Alloc, int64(a.conf.MaxMemory))
}
if int(wi.Net.Connections) > a.conf.MaxConnections && a.conf.MaxConnections > 0 {
dieFunc("exceeded max connections (current=%d, max=%d)", wi.Net.Connections, a.conf.MaxConnections)
}

info.UpdateWatchdogInfo(wi)

// Adjust pre-sampling dynamically
rate, err := sampler.CalcPreSampleRate(a.conf.MaxCPU, wi.CPU.UserAvg, a.Receiver.PreSampler.RealRate())
if err != nil {
log.Warnf("problem computing pre-sample rate: %v", err)
}
a.Receiver.PreSampler.SetRate(rate)
a.Receiver.PreSampler.SetError(err)

preSamplerStats := a.Receiver.PreSampler.Stats()
metrics.Gauge("datadog.trace_agent.presampler_rate", preSamplerStats.Rate, nil, 1)
info.UpdatePreSampler(*preSamplerStats)
}

func traceContainsError(trace pb.Trace) bool {
for _, span := range trace {
if span.Error != 0 {
Expand Down
94 changes: 13 additions & 81 deletions pkg/trace/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package agent
import (
"bytes"
"context"
"fmt"
"math"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -39,69 +37,6 @@ func newMockSampler(wantSampled bool, wantRate float64) *Sampler {
return &Sampler{engine: testutil.NewMockEngine(wantSampled, wantRate)}
}

func TestWatchdog(t *testing.T) {
if testing.Short() {
return
}

conf := config.New()
conf.Endpoints[0].APIKey = "apikey_2"
conf.MaxMemory = 1e7
conf.WatchdogInterval = time.Millisecond

// save the global mux aside, we don't want to break other tests
defaultMux := http.DefaultServeMux
http.DefaultServeMux = http.NewServeMux()

ctx, cancelFunc := context.WithCancel(context.Background())
agnt := NewAgent(ctx, conf)

defer func() {
cancelFunc()
// We need to manually close the receiver as the Run() func
// should have been broken and interrupted by the watchdog panic
agnt.Receiver.Stop()
http.DefaultServeMux = defaultMux
}()

var killed bool
defer func() {
if r := recover(); r != nil {
killed = true
switch v := r.(type) {
case string:
if strings.HasPrefix(v, "exceeded max memory") {
t.Logf("watchdog worked, trapped the right error: %s", v)
runtime.GC() // make sure we clean up after allocating all this
return
}
}
t.Fatalf("unexpected error: %v", r)
}
}()

// allocating a lot of memory
buf := make([]byte, 2*int64(conf.MaxMemory))
buf[0] = 1
buf[len(buf)-1] = 1

// override the default die, else our test would stop, use a plain panic() instead
oldDie := dieFunc
defer func() { dieFunc = oldDie }()
dieFunc = func(format string, args ...interface{}) {
panic(fmt.Sprintf(format, args...))
}

// after some time, the watchdog should kill this
agnt.Run()

// without this. runtime could be smart and free memory before we Run()
buf[0] = 2
buf[len(buf)-1] = 2

assert.True(t, killed)
}

// Test to make sure that the joined effort of the quantizer and truncator, in that order, produce the
// desired string
func TestFormatTrace(t *testing.T) {
Expand Down Expand Up @@ -565,20 +500,6 @@ func runTraceProcessingBenchmark(b *testing.B, c *config.AgentConfig) {
}
}

func BenchmarkWatchdog(b *testing.B) {
conf := config.New()
conf.Endpoints[0].APIKey = "apikey_2"
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
ta := NewAgent(ctx, conf)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
ta.watchdog()
}
}

// Mimicks behaviour of agent Process function
func formatTrace(t pb.Trace) pb.Trace {
for _, span := range t {
Expand Down Expand Up @@ -624,9 +545,20 @@ func benchThroughput(file string) func(*testing.B) {
// start the agent without the trace and stats writers; we will be draining
// these channels ourselves in the benchmarks, plus we don't want the writers
// resource usage to show up in the results.
agnt.start()
agnt.tracePkgChan = make(chan *writer.TracePackage)
go agnt.loop()
go agnt.Run()

// wait for receiver to start:
for {
resp, err := http.Get("http:https://localhost:8126/v0.4/traces")
if err != nil {
time.Sleep(time.Millisecond)
continue
}
if resp.StatusCode == 400 {
break
}
}

// drain every other channel to avoid blockage.
exit := make(chan bool)
Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/agent/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func setupLogger(cfg *config.AgentConfig) error {
minLogLvl = seelog.InfoLvl
}
var duration time.Duration
if cfg.LogThrottlingEnabled {
if cfg.LogThrottling {
duration = 10 * time.Second
}
format := "common"
Expand Down
12 changes: 6 additions & 6 deletions pkg/trace/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func Run(ctx context.Context) {
if flags.CPUProfile != "" {
f, err := os.Create(flags.CPUProfile)
if err != nil {
log.Critical(err)
log.Error(err)
}
pprof.StartCPUProfile(f)
log.Info("CPU profiling started...")
Expand All @@ -76,11 +76,11 @@ func Run(ctx context.Context) {
if flags.PIDFilePath != "" {
err := pidfile.WritePID(flags.PIDFilePath)
if err != nil {
log.Criticalf("error writing PID file, exiting: %v", err)
log.Criticalf("Error writing PID file, exiting: %v", err)
os.Exit(1)
}

log.Infof("pid '%d' written to pid file '%s'", os.Getpid(), flags.PIDFilePath)
log.Infof("PID '%d' written to PID file '%s'", os.Getpid(), flags.PIDFilePath)
defer os.Remove(flags.PIDFilePath)
}

Expand All @@ -94,22 +94,22 @@ func Run(ctx context.Context) {
rand.Seed(time.Now().UTC().UnixNano())

agnt := NewAgent(ctx, cfg)
log.Infof("trace-agent running on host %s", cfg.Hostname)
log.Infof("Trace agent running on host %s", cfg.Hostname)
agnt.Run()

// collect memory profile
if flags.MemProfile != "" {
f, err := os.Create(flags.MemProfile)
if err != nil {
log.Critical("could not create memory profile: ", err)
log.Error("Could not create memory profile: ", err)
}

// get up-to-date statistics
runtime.GC()
// Not using WriteHeapProfile but instead calling WriteTo to
// make sure we pass debug=1 and resolve pointers to names.
if err := pprof.Lookup("heap").WriteTo(f, 1); err != nil {
log.Critical("could not write memory profile: ", err)
log.Error("Could not write memory profile: ", err)
}
f.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/agent/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *Sampler) logStats() {

switch state := state.(type) {
case sampler.InternalState:
log.Debugf("%s: inTPS: %f, outTPS: %f, maxTPS: %f, offset: %f, slope: %f, cardinality: %d",
log.Tracef("%s: inTPS: %f, outTPS: %f, maxTPS: %f, offset: %f, slope: %f, cardinality: %d",
engineType, state.InTPS, state.OutTPS, state.MaxTPS, state.Offset, state.Slope, state.Cardinality)

// publish through expvar
Expand Down
Loading

0 comments on commit c6b38fe

Please sign in to comment.