Skip to content

Commit

Permalink
[scrapehelper] Make the initial scrape interval configurable (#7635)
Browse files Browse the repository at this point in the history
This change will allow scrapers to start gather values on scrape controller start instead of having to wait for scrape controller duration to perform the initial scrape.

This is particularly useful for components such as hostmetrics receiver that generate utilization metrics that require an initial state to compute that value. With this change, it means that this metrics will start being sent from Start + Duration instead of Start + 2*Duration which for scrape durations like 30s means that this value won't be sent til a minute has elapsed.
  • Loading branch information
MovieStoreGuy committed Jun 2, 2023
1 parent deffd48 commit ce65350
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 12 deletions.
19 changes: 19 additions & 0 deletions .chloggen/msg_enhacement-scrape-on-start.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: scraperhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Will start calling scrapers on component start.

# One or more tracking issues or pull requests related to the change
issues: [7635]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The change allows scrapes to perform their initial scrape on component start
and provide an initial delay. This means that scrapes will be delayed by `initial_delay`
before first scrape and then run on `collection_interval` for each consecutive interval.
12 changes: 12 additions & 0 deletions receiver/scraperhelper/scrapercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
// of receiver.Settings, and extend it with more fields if needed.
type ScraperControllerSettings struct {
CollectionInterval time.Duration `mapstructure:"collection_interval"`
InitialDelay time.Duration `mapstructure:"initial_delay"`
}

// NewDefaultScraperControllerSettings returns default scraper controller
// settings with a collection interval of one minute.
func NewDefaultScraperControllerSettings(component.Type) ScraperControllerSettings {
return ScraperControllerSettings{
CollectionInterval: time.Minute,
InitialDelay: time.Second,
}
}

Expand Down Expand Up @@ -61,6 +63,7 @@ type controller struct {
id component.ID
logger *zap.Logger
collectionInterval time.Duration
initialDelay time.Duration
nextConsumer consumer.Metrics

scrapers []Scraper
Expand Down Expand Up @@ -104,6 +107,7 @@ func NewScraperControllerReceiver(
id: set.ID,
logger: set.Logger,
collectionInterval: cfg.CollectionInterval,
initialDelay: cfg.InitialDelay,
nextConsumer: nextConsumer,
done: make(chan struct{}),
terminated: make(chan struct{}),
Expand Down Expand Up @@ -167,13 +171,21 @@ func (sc *controller) Shutdown(ctx context.Context) error {
// collection interval.
func (sc *controller) startScraping() {
go func() {
if sc.initialDelay > 0 {
<-time.After(sc.initialDelay)
}

if sc.tickerCh == nil {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()

sc.tickerCh = ticker.C
}

// Call scrape method on initialision to ensure
// that scrapers start from when the component starts
// instead of waiting for the full duration to start.
sc.scrapeMetricsAndReport(context.Background())
for {
select {
case <-sc.tickerCh:
Expand Down
106 changes: 94 additions & 12 deletions receiver/scraperhelper/scrapercontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func (ts *testScrapeMetrics) scrape(_ context.Context) (pmetric.Metrics, error)
return md, nil
}

func newTestNoDelaySettings() *ScraperControllerSettings {
return &ScraperControllerSettings{
CollectionInterval: time.Second,
InitialDelay: 0,
}
}

type metricsTestCase struct {
name string

Expand Down Expand Up @@ -108,10 +115,11 @@ func TestScrapeController(t *testing.T) {
scrapeErr: errors.New("err1"),
},
{
name: "AddMetricsScrapersWithInitializeAndClose",
scrapers: 2,
initialize: true,
close: true,
name: "AddMetricsScrapersWithInitializeAndClose",
scrapers: 2,
initialize: true,
expectScraped: true,
close: true,
},
{
name: "AddMetricsScrapersWithInitializeAndCloseErrors",
Expand All @@ -124,6 +132,7 @@ func TestScrapeController(t *testing.T) {
}

for _, test := range testCases {
test := test
t.Run(test.name, func(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(component.NewID("receiver"))
require.NoError(t, err)
Expand All @@ -142,8 +151,7 @@ func TestScrapeController(t *testing.T) {
if !test.nilNextConsumer {
nextConsumer = sink
}
defaultCfg := NewDefaultScraperControllerSettings("receiver")
cfg := &defaultCfg
cfg := newTestNoDelaySettings()
if test.scraperControllerSettings != nil {
cfg = test.scraperControllerSettings
}
Expand All @@ -167,6 +175,10 @@ func TestScrapeController(t *testing.T) {

if test.expectScraped || test.scrapeErr != nil {
// validate that scrape is called at least N times for each configured scraper
for _, ch := range scrapeMetricsChs {
<-ch
}
// Consume the initial scrapes on start
for i := 0; i < iterations; i++ {
tickerCh <- time.Now()

Expand All @@ -178,7 +190,7 @@ func TestScrapeController(t *testing.T) {
// wait until all calls to scrape have completed
if test.scrapeErr == nil {
require.Eventually(t, func() bool {
return sink.DataPointCount() == iterations*(test.scrapers)
return sink.DataPointCount() == (1+iterations)*(test.scrapers)
}, time.Second, time.Millisecond)
}

Expand Down Expand Up @@ -316,12 +328,11 @@ func assertScraperViews(t *testing.T, tt obsreporttest.TestTelemetry, expectedEr
require.NoError(t, obsreporttest.CheckScraperMetrics(tt, component.NewID("receiver"), component.NewID("scraper"), expectedScraped, expectedErrored))
}

func TestSingleScrapePerTick(t *testing.T) {
func TestSingleScrapePerInterval(t *testing.T) {
scrapeMetricsCh := make(chan int, 10)
tsm := &testScrapeMetrics{ch: scrapeMetricsCh}

defaultCfg := NewDefaultScraperControllerSettings("")
cfg := &defaultCfg
cfg := newTestNoDelaySettings()

tickerCh := make(chan time.Time)

Expand All @@ -341,12 +352,83 @@ func TestSingleScrapePerTick(t *testing.T) {

tickerCh <- time.Now()

assert.Equal(t, 1, <-scrapeMetricsCh)
assert.Eventually(
t,
func() bool {
return <-scrapeMetricsCh == 2
},
300*time.Millisecond,
100*time.Millisecond,
"Make sure the scraper channel is called twice",
)

select {
case <-scrapeMetricsCh:
assert.Fail(t, "Scrape was called more than once")
assert.Fail(t, "Scrape was called more than twice")
case <-time.After(100 * time.Millisecond):
return
}
}

func TestScrapeControllerStartsOnInit(t *testing.T) {
t.Parallel()

tsm := &testScrapeMetrics{
ch: make(chan int, 1),
}

scp, err := NewScraper("", tsm.scrape)
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
&ScraperControllerSettings{
CollectionInterval: time.Hour,
InitialDelay: 0,
},
receivertest.NewNopCreateSettings(),
new(consumertest.MetricsSink),
AddScraper(scp),
)
require.NoError(t, err, "Must not error when creating scrape controller")

assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error on start")
<-time.After(500 * time.Nanosecond)
assert.NoError(t, r.Shutdown(context.Background()), "Must not have errored on shutdown")
assert.Equal(t, tsm.timesScrapeCalled, 1, "Must have been called as soon as the controller started")
}

func TestScrapeControllerInitialDelay(t *testing.T) {
if testing.Short() {
t.Skip("This requires real time to pass, skipping")
return
}

t.Parallel()

var (
elapsed = make(chan time.Time, 1)
cfg = NewDefaultScraperControllerSettings("my-delayed-scraper")
)

scp, err := NewScraper("timed", func(ctx context.Context) (pmetric.Metrics, error) {
elapsed <- time.Now()
return pmetric.NewMetrics(), nil
})
require.NoError(t, err, "Must not error when creating scraper")

r, err := NewScraperControllerReceiver(
&cfg,
receivertest.NewNopCreateSettings(),
new(consumertest.MetricsSink),
AddScraper(scp),
)
require.NoError(t, err, "Must not error when creating receiver")

t0 := time.Now()
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error when starting")
t1 := <-elapsed

assert.GreaterOrEqual(t, t1.Sub(t0), 300*time.Millisecond, "Must have had 300ms pass as defined by initial delay")

assert.NoError(t, r.Shutdown(context.Background()), "Must not error closing down")
}

0 comments on commit ce65350

Please sign in to comment.