Skip to content

Commit

Permalink
pkg/trace/stats: support measured spans for trace stats calculation (D…
Browse files Browse the repository at this point in the history
…ataDog#4866)

This change adds support for a new metric on spans (`_dd.measured`) which will cause the concentrator to accept the spans having this metric for stats calculations.
  • Loading branch information
jdgumz committed Feb 14, 2020
1 parent d454524 commit 3ea70de
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 66 deletions.
5 changes: 2 additions & 3 deletions pkg/trace/stats/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,13 @@ func (c *Concentrator) addNow(i *Input, now int64) {
c.mu.Lock()

for _, s := range i.Trace {
// We do not compute stats for non top level spans since this is not surfaced in the UI
if !s.TopLevel {
if !(s.TopLevel || s.Measured) {
continue
}
end := s.Start + s.Duration
btime := end - end%c.bsize

// // If too far in the past, count in the oldest-allowed time bucket instead.
// If too far in the past, count in the oldest-allowed time bucket instead.
if btime < c.oldestTs {
btime = c.oldestTs
}
Expand Down
249 changes: 201 additions & 48 deletions pkg/trace/stats/concentrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/stretchr/testify/assert"
)

var testBucketInterval = time.Duration(2 * time.Second).Nanoseconds()
var (
testBucketInterval = time.Duration(2 * time.Second).Nanoseconds()
)

func NewTestConcentrator() *Concentrator {
statsChan := make(chan []Bucket)
Expand Down Expand Up @@ -48,6 +50,87 @@ func testSpan(spanID uint64, parentID uint64, duration, offset int64, service, r
}
}

// newMeasuredSpan is a function that can make measured spans as test fixtures.
func newMeasuredSpan(spanID uint64, parentID uint64, duration, offset int64, name, service, resource string, err int32) *pb.Span {
now := time.Now().UnixNano()
alignedNow := now - now%testBucketInterval

return &pb.Span{
SpanID: spanID,
ParentID: parentID,
Duration: duration,
Start: getTsInBucket(alignedNow, testBucketInterval, offset) - duration,
Service: service,
Name: name,
Resource: resource,
Error: err,
Type: "db",
Metrics: map[string]float64{"_dd.measured": 1},
}
}

// countValsEq is a test utility function to assert expected == actual for count aggregations.
func countValsEq(t *testing.T, expected map[string]float64, actual map[string]Count) {
assert := assert.New(t)
assert.Equal(len(expected), len(actual))
for key, val := range expected {
count, ok := actual[key]
assert.True(ok, "Missing expected key from actual counts: %s", key)
assert.Equal(val, count.Value)
}
}

func TestCountValsEq(t *testing.T) {
ts := TagSet{
Tag{
Name: "env",
Value: "staging",
},
Tag{
Name: "service",
Value: "myservice",
},
Tag{
Name: "resource",
Value: "resource1",
},
}
countValsEq(
t,
map[string]float64{
"query|duration|env:staging,service:myservice,resource:resource1": 450.0,
"query|hits|env:staging,service:myservice,resource:resource1": 1.0,
"query|errors|env:staging,service:myservice,resource:resource1": 0.0,
},
map[string]Count{
"query|duration|env:staging,service:myservice,resource:resource1": {
Key: "query|hits|env:staging,service:myservice,resource:resource1",
Name: "query",
Measure: "hits",
TagSet: ts,
TopLevel: 1.0,
Value: 450.0,
},
"query|hits|env:staging,service:myservice,resource:resource1": {
Key: "query|hits|env:staging,service:myservice,resource:resource1",
Name: "query",
Measure: "hits",
TagSet: ts,
TopLevel: 1.0,
Value: 1.0,
},
"query|errors|env:staging,service:myservice,resource:resource1": {
Key: "query|hits|env:staging,service:myservice,resource:resource1",
Name: "query",
Measure: "hits",
TagSet: ts,
TopLevel: 1.0,
Value: 0.0,
},
},
)
}

// TestConcentratorOldestTs tests that the Agent doesn't report time buckets from a
// time before its start
func TestConcentratorOldestTs(t *testing.T) {
Expand Down Expand Up @@ -94,15 +177,14 @@ func TestConcentratorOldestTs(t *testing.T) {
t.FailNow()
}

// First oldest bucket aggregates old past time buckets, it should have it all.
for key, count := range stats[0].Counts {
if key == "query|duration|env:none,resource:resource1,service:A1" {
assert.Equal(151, int(count.Value), "Wrong value for duration")
}
if key == "query|hits|env:none,resource:resource1,service:A1" {
assert.Equal(6, int(count.Value), "Wrong value for hits")
}
// First oldest bucket aggregates old past time buckets, so each count
// should be an aggregated total across the spans.
expected := map[string]float64{
"query|duration|env:none,resource:resource1,service:A1": 151,
"query|hits|env:none,resource:resource1,service:A1": 6,
"query|errors|env:none,resource:resource1,service:A1": 0,
}
countValsEq(t, expected, stats[0].Counts)
})

t.Run("hot", func(t *testing.T) {
Expand All @@ -125,30 +207,27 @@ func TestConcentratorOldestTs(t *testing.T) {
}
flushTime += testBucketInterval

// First oldest bucket aggregates, it should have it all except the last span.
for key, count := range stats[0].Counts {
if key == "query|duration|env:none,resource:resource1,service:A1" {
assert.Equal(150, int(count.Value), "Wrong value for duration")
}
if key == "query|hits|env:none,resource:resource1,service:A1" {
assert.Equal(5, int(count.Value), "Wrong value for hits")
}
// First oldest bucket aggregates, it should have it all except the
// last four spans that have offset of 0.
expected := map[string]float64{
"query|duration|env:none,resource:resource1,service:A1": 150,
"query|hits|env:none,resource:resource1,service:A1": 5,
"query|errors|env:none,resource:resource1,service:A1": 0,
}
countValsEq(t, expected, stats[0].Counts)

stats = c.flushNow(flushTime)
if !assert.Equal(1, len(stats), "We should get exactly 1 Bucket") {
t.FailNow()
}

// Stats of the last span.
for key, count := range stats[0].Counts {
if key == "query|duration|env:none,resource:resource1,service:A1" {
assert.Equal(1, int(count.Value), "Wrong value for duration")
}
if key == "query|hits|env:none,resource:resource1,service:A1" {
assert.Equal(1, int(count.Value), "Wrong value for hits")
}
// Stats of the last four spans.
expected = map[string]float64{
"query|duration|env:none,resource:resource1,service:A1": 1,
"query|hits|env:none,resource:resource1,service:A1": 1,
"query|errors|env:none,resource:resource1,service:A1": 0,
}
countValsEq(t, expected, stats[0].Counts)
})
}

Expand Down Expand Up @@ -185,8 +264,9 @@ func TestConcentratorStatsTotals(t *testing.T) {
}
c.addNow(testTrace, time.Now().UnixNano())

var hits float64
var duration float64
var hits float64
var errors float64

flushTime := now
for i := 0; i <= c.bufferLen; i++ {
Expand All @@ -203,12 +283,16 @@ func TestConcentratorStatsTotals(t *testing.T) {
if key == "query|hits|env:none,resource:resource1,service:A1" {
hits += count.Value
}
if key == "query|errors|env:none,resource:resource1,service:A1" {
errors += count.Value
}
}
flushTime += c.bsize
}

assert.Equal(hits, float64(len(trace)), "Wrong value for total hits %d", hits)
assert.Equal(duration, float64(50+40+30+20+10+1), "Wrong value for total duration %d", duration)
assert.Equal(hits, float64(len(trace)), "Wrong value for total hits %d", hits)
assert.Equal(errors, float64(0), "Wrong value for total errors %d", errors)
}

// TestConcentratorStatsCounts tests exhaustively each stats bucket, over multiple time buckets.
Expand Down Expand Up @@ -245,8 +329,9 @@ func TestConcentratorStatsCounts(t *testing.T) {
testSpan(6, 0, 24, 0, "A1", "resource2", 0),
}

expectedCountValByKeyByTime := make(map[int64]map[string]int64)
expectedCountValByKeyByTime[alignedNow-2*testBucketInterval] = map[string]int64{
expectedCountValByKeyByTime := make(map[int64]map[string]float64)
// 2-bucket old flush
expectedCountValByKeyByTime[alignedNow-2*testBucketInterval] = map[string]float64{
"query|duration|env:none,resource:resource1,service:A1": 369,
"query|duration|env:none,resource:resource2,service:A2": 300000000040,
"query|duration|env:none,resource:resourcefoo,service:A2": 30,
Expand All @@ -257,7 +342,8 @@ func TestConcentratorStatsCounts(t *testing.T) {
"query|hits|env:none,resource:resource2,service:A2": 2,
"query|hits|env:none,resource:resourcefoo,service:A2": 1,
}
expectedCountValByKeyByTime[alignedNow-1*testBucketInterval] = map[string]int64{
// 1-bucket old flush
expectedCountValByKeyByTime[alignedNow-1*testBucketInterval] = map[string]float64{
"query|duration|env:none,resource:resource1,service:A1": 12,
"query|duration|env:none,resource:resource2,service:A1": 24,
"query|duration|env:none,resource:resource1,service:A2": 40,
Expand All @@ -274,12 +360,13 @@ func TestConcentratorStatsCounts(t *testing.T) {
"query|hits|env:none,resource:resource2,service:A2": 1,
"query|hits|env:none,resource:resourcefoo,service:A2": 1,
}
expectedCountValByKeyByTime[alignedNow] = map[string]int64{
// last bucket to be flushed
expectedCountValByKeyByTime[alignedNow] = map[string]float64{
"query|duration|env:none,resource:resource2,service:A1": 24,
"query|errors|env:none,resource:resource2,service:A1": 0,
"query|hits|env:none,resource:resource2,service:A1": 1,
}
expectedCountValByKeyByTime[alignedNow+testBucketInterval] = map[string]int64{}
expectedCountValByKeyByTime[alignedNow+testBucketInterval] = map[string]float64{}

traceutil.ComputeTopLevel(trace)
wt := NewWeightedTrace(trace, traceutil.GetRoot(trace))
Expand Down Expand Up @@ -313,14 +400,7 @@ func TestConcentratorStatsCounts(t *testing.T) {
expectedCountValByKey := expectedCountValByKeyByTime[expectedFlushedTs]
receivedCounts := receivedBuckets[0].Counts

// verify we got all counts
assert.Equal(len(expectedCountValByKey), len(receivedCounts), "GOT %v", receivedCounts)
// verify values
for key, val := range expectedCountValByKey {
count, ok := receivedCounts[key]
assert.True(ok, "%s was expected from concentrator", key)
assert.Equal(val, int64(count.Value), "Wrong value for count %s", key)
}
countValsEq(t, expectedCountValByKey, receivedCounts)

// Flushing again at the same time should return nothing
stats = c.flushNow(flushTime)
Expand Down Expand Up @@ -381,7 +461,7 @@ func TestConcentratorSublayersStatsCounts(t *testing.T) {

// Start with the first/older bucket
receivedCounts = stats[0].Counts
expectedCountValByKey := map[string]int64{
expectedCountValByKey := map[string]float64{
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A1": 2000,
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A2": 2000,
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A3": 370,
Expand Down Expand Up @@ -410,13 +490,86 @@ func TestConcentratorSublayersStatsCounts(t *testing.T) {
"query|hits|env:none,resource:resource4,service:A3": 1,
"query|hits|env:none,resource:resource6,service:A3": 1,
}
countValsEq(t, expectedCountValByKey, receivedCounts)
}

// verify we got all counts
assert.Equal(len(expectedCountValByKey), len(receivedCounts), "GOT %v", receivedCounts)
// verify values
for key, val := range expectedCountValByKey {
count, ok := receivedCounts[key]
assert.True(ok, "%s was expected from concentrator", key)
assert.Equal(val, int64(count.Value), "Wrong value for count %s", key)
// TestConcentratorAddNow tests the count aggregation behavior of addNow.
func TestConcentratorAddNow(t *testing.T) {
now := time.Now().UnixNano()
for name, test := range map[string]struct {
in pb.Trace
out map[string]float64
}{
// case of existing behavior
"top": {
pb.Trace{
testSpan(1, 0, 50, 5, "A1", "resource1", 0),
testSpan(2, 1, 40, 4, "A1", "resource1", 1),
},
map[string]float64{
"query|duration|env:none,resource:resource1,service:A1": 50,
"query|hits|env:none,resource:resource1,service:A1": 1,
"query|errors|env:none,resource:resource1,service:A1": 0,
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A1": 90,
"query|_sublayers.duration.by_type|env:none,resource:resource1,service:A1,sublayer_type:db": 90,
"query|_sublayers.span_count|env:none,resource:resource1,service:A1,:": 2,
},
},
// mixed = first span is both top-level _and_ measured
"mixed": {
pb.Trace{
newMeasuredSpan(1, 0, 50, 5, "http.request", "A1", "resource1", 0),
testSpan(2, 1, 40, 4, "A1", "resource1", 1),
},
map[string]float64{
"http.request|duration|env:none,resource:resource1,service:A1": 50,
"http.request|hits|env:none,resource:resource1,service:A1": 1,
"http.request|errors|env:none,resource:resource1,service:A1": 0,
"http.request|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A1": 90,
"http.request|_sublayers.duration.by_type|env:none,resource:resource1,service:A1,sublayer_type:db": 90,
"http.request|_sublayers.span_count|env:none,resource:resource1,service:A1,:": 2,
},
},
// distinct top-level and measured spans
// only the top-level span gets sublayer metrics (span name == "query")
"distinct": {
pb.Trace{
testSpan(1, 0, 50, 5, "A1", "resource1", 0),
newMeasuredSpan(2, 1, 40, 4, "custom_query_op", "A1", "resource1", 1),
testSpan(3, 2, 50, 5, "A1", "resource1", 0),
},
map[string]float64{
"query|duration|env:none,resource:resource1,service:A1": 50,
"query|hits|env:none,resource:resource1,service:A1": 1,
"query|errors|env:none,resource:resource1,service:A1": 0,
"query|_sublayers.duration.by_service|env:none,resource:resource1,service:A1,sublayer_service:A1": 140,
"query|_sublayers.duration.by_type|env:none,resource:resource1,service:A1,sublayer_type:db": 140,
"query|_sublayers.span_count|env:none,resource:resource1,service:A1,:": 3,
"custom_query_op|duration|env:none,resource:resource1,service:A1": 40,
"custom_query_op|hits|env:none,resource:resource1,service:A1": 1,
"custom_query_op|errors|env:none,resource:resource1,service:A1": 1,
},
},
} {
t.Run(name, func(*testing.T) {
statsChan := make(chan []Bucket)
traceutil.ComputeTopLevel(test.in)
wt := NewWeightedTrace(test.in, traceutil.GetRoot(test.in))
testTrace := &Input{
Env: "none",
Trace: wt,
}
subtraces := ExtractTopLevelSubtraces(test.in, traceutil.GetRoot(test.in))
sublayers := make(map[*pb.Span][]SublayerValue)
for _, subtrace := range subtraces {
subtraceSublayers := ComputeSublayers(subtrace.Trace)
sublayers[subtrace.Root] = subtraceSublayers
}
testTrace.Sublayers = sublayers
c := NewConcentrator([]string{}, testBucketInterval, statsChan)
c.addNow(testTrace, time.Now().UnixNano())
stats := c.flushNow(now + (int64(c.bufferLen) * testBucketInterval))
countValsEq(t, test.out, stats[0].Counts)
})
}
}
2 changes: 2 additions & 0 deletions pkg/trace/stats/weight.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type WeightedSpan struct {
Weight float64 // Span weight. Similar to the trace root.Weight().
TopLevel bool // Is this span a service top-level or not. Similar to span.TopLevel().
Measured bool // Is this span marked for metrics computation.

*pb.Span
}
Expand All @@ -32,6 +33,7 @@ func NewWeightedTrace(trace pb.Trace, root *pb.Span) WeightedTrace {
Span: trace[i],
Weight: weight,
TopLevel: traceutil.HasTopLevel(trace[i]),
Measured: traceutil.IsMeasured(trace[i]),
}
}
return wt
Expand Down
Loading

0 comments on commit 3ea70de

Please sign in to comment.