Skip to content

Commit

Permalink
Js/diagnose flaky prom tests (open-telemetry#12986)
Browse files Browse the repository at this point in the history
Addressed scrape timing configuration that was too tight for production test environment, causing test to fail sometimes due to borderline slowness of the http response loop.
Added additional race condition protection to account for potential race between http server request processing and reciever response processing.
  • Loading branch information
jspaleta committed Aug 8, 2022
1 parent caf24b6 commit 2daae88
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions receiver/prometheusreceiver/metrics_receiver_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"sync"
"testing"
"time"

gokitlog "github.com/go-kit/log"
promcfg "github.com/prometheus/prometheus/config"
Expand Down Expand Up @@ -77,7 +78,6 @@ func newMockPrometheus(endpoints map[string][]mockPrometheusResponse) *mockProme
func (mp *mockPrometheus) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
mp.mu.Lock()
defer mp.mu.Unlock()

iptr, ok := mp.accessIndex[req.URL.Path]
if !ok {
rw.WriteHeader(404)
Expand Down Expand Up @@ -138,7 +138,8 @@ func setupMockPrometheus(tds ...*testData) (*mockPrometheus, *promcfg.Config, er
job := make(map[string]interface{})
job["job_name"] = tds[i].name
job["metrics_path"] = metricPaths[i]
job["scrape_interval"] = "100ms"
job["scrape_interval"] = "1s"
job["scrape_timeout"] = "500ms"
job["static_configs"] = []map[string]interface{}{{"targets": []string{u.Host}}}
jobs = append(jobs, job)
}
Expand All @@ -160,6 +161,35 @@ func setupMockPrometheus(tds ...*testData) (*mockPrometheus, *promcfg.Config, er
return mp, pCfg, err
}

func waitForScrapeResults(t *testing.T, targets []*testData, cms *consumertest.MetricsSink) {
assert.Eventually(t, func() bool {
// This is the receiver's pov as to what should have been collected from the server
metrics := cms.AllMetrics()
pResults := splitMetricsByTarget(metrics)
for _, target := range targets {
want := 0
name := target.name
if target.relabeledJob != "" {
name = target.relabeledJob
}
scrapes := pResults[name]
// count the number of pages we expect for a target endpoint
for _, p := range target.pages {
if p.code != 404 {
// only count target pages that are not 404, matching mock ServerHTTP func response logic
want++
}

}
if len(scrapes) < want {
// If we don't have enough scrapes yet lets return false and wait for another tick
return false
}
}
return true
}, 30*time.Second, 500*time.Millisecond)
}

func verifyNumValidScrapeResults(t *testing.T, td *testData, resourceMetrics []*pmetric.ResourceMetrics) {
want := 0
for _, p := range td.pages {
Expand Down Expand Up @@ -574,18 +604,25 @@ func testComponent(t *testing.T, targets []*testData, useStartTimeMetric bool, s
}, cms)

require.NoError(t, receiver.Start(ctx, componenttest.NewNopHost()))

// verify state after shutdown is called
t.Cleanup(func() {
// verify state after shutdown is called
assert.Lenf(t, flattenTargets(receiver.scrapeManager.TargetsAll()), len(targets), "expected %v targets to be running", len(targets))
require.NoError(t, receiver.Shutdown(context.Background()))
assert.Len(t, flattenTargets(receiver.scrapeManager.TargetsAll()), 0, "expected scrape manager to have no targets")
})
// wait for all provided data to be scraped

// waitgroup Wait() is strictly from a server POV indicating the sufficient number and type of requests have been seen
mp.wg.Wait()
metrics := cms.AllMetrics()

// Note:waitForScrapeResult is an attempt to address a possible race between waitgroup Done() being called in the ServerHTTP function
// and when the receiver actually processes the http request responses into metrics.
// this is a eventually timeout,tick that just waits for some condition.
// however the condition to wait for may be suboptimal and may need to be adjusted.
waitForScrapeResults(t, targets, cms)

// This begins the processing of the scrapes collected by the receiver
metrics := cms.AllMetrics()
// split and store results by target name
pResults := splitMetricsByTarget(metrics)
lres, lep := len(pResults), len(mp.endpoints)
Expand Down

0 comments on commit 2daae88

Please sign in to comment.