Skip to content

Commit

Permalink
[bugfix] Treat input throughput data as immutable (#3360)
Browse files Browse the repository at this point in the history
* Issue #3349 - enable `-shuffle=on` in tests

Signed-off-by: rbroggi <[email protected]>

* fixing test interdependency due to shared global variable (which is written to)

Signed-off-by: rbroggi <[email protected]>

* fixing test interdependency due to shared global variable

Signed-off-by: rbroggi <[email protected]>

* enforcing method immutability

Signed-off-by: rbroggi <[email protected]>

* temporarily disable shuffle

Signed-off-by: rbroggi <[email protected]>
  • Loading branch information
rbroggi committed Oct 31, 2021
1 parent e6b1c8f commit 514e72b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
16 changes: 15 additions & 1 deletion plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,26 @@ func (p *Processor) aggregateThroughput(throughputs []*model.Throughput) service
t.Count += throughput.Count
t.Probabilities = merge(t.Probabilities, throughput.Probabilities)
} else {
aggregatedThroughput[service][operation] = throughput
copyThroughput := model.Throughput{
Service: throughput.Service,
Operation: throughput.Operation,
Count: throughput.Count,
Probabilities: copySet(throughput.Probabilities),
}
aggregatedThroughput[service][operation] = &copyThroughput
}
}
return aggregatedThroughput
}

func copySet(in map[string]struct{}) map[string]struct{} {
out := make(map[string]struct{}, len(in))
for key := range in {
out[key] = struct{}{}
}
return out
}

func (p *Processor) initializeThroughput(endTime time.Time) {
for i := 0; i < p.AggregationBuckets; i++ {
startTime := endTime.Add(p.CalculationInterval * -1)
Expand Down
53 changes: 33 additions & 20 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

var (
testThroughputs = []*model.Throughput{
func testThroughputs() []*model.Throughput {
return []*model.Throughput{
{Service: "svcA", Operation: "GET", Count: 4, Probabilities: map[string]struct{}{"0.1": {}}},
{Service: "svcA", Operation: "GET", Count: 4, Probabilities: map[string]struct{}{"0.2": {}}},
{Service: "svcA", Operation: "PUT", Count: 5, Probabilities: map[string]struct{}{"0.1": {}}},
{Service: "svcB", Operation: "GET", Count: 3, Probabilities: map[string]struct{}{"0.1": {}}},
}
}

testThroughputBuckets = []*throughputBucket{
func testThroughputBuckets() []*throughputBucket {
return []*throughputBucket{
{
throughput: serviceOperationThroughput{
"svcA": map[string]*model.Throughput{
Expand All @@ -69,18 +71,29 @@ var (
interval: 60 * time.Second,
},
}
}

errTestStorage = errors.New("storage error")
func errTestStorage() error {
return errors.New("storage error")
}

testCalculator = calculationstrategy.CalculateFunc(func(targetQPS, qps, oldProbability float64) float64 {
func testCalculator() calculationstrategy.ProbabilityCalculator {
return calculationstrategy.CalculateFunc(func(targetQPS, qps, oldProbability float64) float64 {
factor := targetQPS / qps
return oldProbability * factor
})
)
}

func TestAggregateThroughputInputsImmutability(t *testing.T) {
p := &Processor{}
in := testThroughputs()
_ = p.aggregateThroughput(in)
assert.Equal(t, in, testThroughputs())
}

func TestAggregateThroughput(t *testing.T) {
p := &Processor{}
aggregatedThroughput := p.aggregateThroughput(testThroughputs)
aggregatedThroughput := p.aggregateThroughput(testThroughputs())
require.Len(t, aggregatedThroughput, 2)

throughput, ok := aggregatedThroughput["svcA"]
Expand Down Expand Up @@ -110,7 +123,7 @@ func TestAggregateThroughput(t *testing.T) {
func TestInitializeThroughput(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*19), time.Time{}.Add(time.Minute*20)).
Return(testThroughputs, nil)
Return(testThroughputs(), nil)
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*18), time.Time{}.Add(time.Minute*19)).
Return([]*model.Throughput{{Service: "svcA", Operation: "GET", Count: 7}}, nil)
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*17), time.Time{}.Add(time.Minute*18)).
Expand All @@ -130,7 +143,7 @@ func TestInitializeThroughput(t *testing.T) {
func TestInitializeThroughputFailure(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*19), time.Time{}.Add(time.Minute*20)).
Return(nil, errTestStorage)
Return(nil, errTestStorage())
p := &Processor{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}}
p.initializeThroughput(time.Time{}.Add(time.Minute * 20))

Expand All @@ -146,7 +159,7 @@ func TestCalculateQPS(t *testing.T) {
}

func TestGenerateOperationQPS(t *testing.T) {
p := &Processor{throughputs: testThroughputBuckets, Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}}
p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}}
svcOpQPS := p.throughputToQPS()
assert.Len(t, svcOpQPS, 2)

Expand Down Expand Up @@ -194,7 +207,7 @@ func TestGenerateOperationQPS(t *testing.T) {
}

func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) {
p := &Processor{throughputs: testThroughputBuckets, Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}}
p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}}
svcOpQPS := p.throughputToQPS()
assert.Len(t, svcOpQPS, 2)

Expand Down Expand Up @@ -258,7 +271,7 @@ func TestCalculateProbability(t *testing.T) {
p := &Processor{
Options: cfg,
probabilities: probabilities,
probabilityCalculator: testCalculator,
probabilityCalculator: testCalculator(),
throughputs: throughputs,
serviceCache: []SamplingCache{{"svcA": {}, "svcB": {}}},
}
Expand Down Expand Up @@ -302,8 +315,8 @@ func TestCalculateProbabilitiesAndQPS(t *testing.T) {
InitialSamplingProbability: 0.001,
BucketsForCalculation: 10,
},
throughputs: testThroughputBuckets, probabilities: prevProbabilities, qps: qps,
weightVectorCache: NewWeightVectorCache(), probabilityCalculator: testCalculator,
throughputs: testThroughputBuckets(), probabilities: prevProbabilities, qps: qps,
weightVectorCache: NewWeightVectorCache(), probabilityCalculator: testCalculator(),
operationsCalculatedGauge: mets.Gauge(metrics.Options{Name: "test"}),
}
probabilities, qps := p.calculateProbabilitiesAndQPS()
Expand All @@ -324,10 +337,10 @@ func TestRunCalculationLoop(t *testing.T) {
logger := zap.NewNop()
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(testThroughputs, nil)
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage)
Return(testThroughputs(), nil)
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage())
mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"),
mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage)
mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage())
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
Expand Down Expand Up @@ -366,7 +379,7 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
logger, logBuffer := testutils.NewLogger()
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(nil, errTestStorage)
Return(nil, errTestStorage())
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
Expand Down Expand Up @@ -442,7 +455,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) {
t.Skip("Skipped realistic calculation loop test")
logger := zap.NewNop()
// NB: This is an extremely long test since it uses near realistic (1/6th scale) processor config values
testThroughputs = []*model.Throughput{
testThroughputs := []*model.Throughput{
{Service: "svcA", Operation: "GET", Count: 10},
{Service: "svcA", Operation: "POST", Count: 9},
{Service: "svcA", Operation: "PUT", Count: 5},
Expand Down Expand Up @@ -840,7 +853,7 @@ func TestCalculateProbabilitiesAndQPSMultiple(t *testing.T) {

func TestErrors(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage)
mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage())
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(nil, nil)

Expand Down

0 comments on commit 514e72b

Please sign in to comment.