Skip to content

Commit

Permalink
[scraper helper] improve ease of use of scraper controller settings (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#7951)

- Reduce the complexity required in order to override values within
components
- Address feedback from
open-telemetry#7703 (comment)
  • Loading branch information
MovieStoreGuy committed Jul 13, 2023
1 parent 11effd3 commit 38ba4d5
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 22 deletions.
16 changes: 16 additions & 0 deletions .chloggen/msg_cleanup-scraper-helper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: scraperhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adding optional timeout field to scrapers

# One or more tracking issues or pull requests related to the change
issues: [7951]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
39 changes: 18 additions & 21 deletions receiver/scraperhelper/scrapercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,6 @@ import (
"go.opentelemetry.io/collector/receiver/scrapererror"
)

// ScraperControllerSettings defines common settings for a scraper controller
// configuration. Scraper controller receivers can embed this struct, instead
// of receiver.Settings, and extend it with more fields if needed.
type ScraperControllerSettings struct {
CollectionInterval time.Duration `mapstructure:"collection_interval"`
InitialDelay time.Duration `mapstructure:"initial_delay"`
}

// NewDefaultScraperControllerSettings returns default scraper controller
// settings with a collection interval of one minute.
func NewDefaultScraperControllerSettings(component.Type) ScraperControllerSettings {
return ScraperControllerSettings{
CollectionInterval: time.Minute,
InitialDelay: time.Second,
}
}

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption func(*controller)

Expand Down Expand Up @@ -64,6 +47,7 @@ type controller struct {
logger *zap.Logger
collectionInterval time.Duration
initialDelay time.Duration
timeout time.Duration
nextConsumer consumer.Metrics

scrapers []Scraper
Expand Down Expand Up @@ -108,6 +92,7 @@ func NewScraperControllerReceiver(
logger: set.Logger,
collectionInterval: cfg.CollectionInterval,
initialDelay: cfg.InitialDelay,
timeout: cfg.Timeout,
nextConsumer: nextConsumer,
done: make(chan struct{}),
terminated: make(chan struct{}),
Expand Down Expand Up @@ -181,15 +166,14 @@ func (sc *controller) startScraping() {

sc.tickerCh = ticker.C
}

// Call scrape method on initialision to ensure
// that scrapers start from when the component starts
// instead of waiting for the full duration to start.
sc.scrapeMetricsAndReport(context.Background())
sc.scrapeMetricsAndReport()
for {
select {
case <-sc.tickerCh:
sc.scrapeMetricsAndReport(context.Background())
sc.scrapeMetricsAndReport()
case <-sc.done:
sc.terminated <- struct{}{}
return
Expand All @@ -201,7 +185,10 @@ func (sc *controller) startScraping() {
// scrapeMetricsAndReport calls the Scrape function for each of the configured
// Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (sc *controller) scrapeMetricsAndReport(ctx context.Context) {
func (sc *controller) scrapeMetricsAndReport() {
ctx, done := withScrapeContext(sc.timeout)
defer done()

metrics := pmetric.NewMetrics()

for i, scraper := range sc.scrapers {
Expand Down Expand Up @@ -230,3 +217,13 @@ func (sc *controller) scrapeMetricsAndReport(ctx context.Context) {
func (sc *controller) stopScraping() {
close(sc.done)
}

// withScrapeContext will return a context that has no deadline if timeout is 0
// which implies no explicit timeout had occurred, otherwise, a context
// with a deadline of the provided timeout is returned.
func withScrapeContext(timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout == 0 {
return context.WithCancel(context.Background())
}
return context.WithTimeout(context.Background(), timeout)
}
5 changes: 4 additions & 1 deletion receiver/scraperhelper/scrapercontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,10 @@ func TestScrapeControllerInitialDelay(t *testing.T) {

var (
elapsed = make(chan time.Time, 1)
cfg = NewDefaultScraperControllerSettings("my-delayed-scraper")
cfg = ScraperControllerSettings{
CollectionInterval: time.Second,
InitialDelay: 300 * time.Millisecond,
}
)

scp, err := NewScraper("timed", func(ctx context.Context) (pmetric.Metrics, error) {
Expand Down
53 changes: 53 additions & 0 deletions receiver/scraperhelper/settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperhelper"

import (
"errors"
"fmt"
"time"

"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
)

var (
errNonPositiveInterval = errors.New("requires positive value")
)

// ScraperControllerSettings defines common settings for a scraper controller
// configuration. Scraper controller receivers can embed this struct, instead
// of receiver.Settings, and extend it with more fields if needed.
type ScraperControllerSettings struct {
// CollectionInterval sets the how frequently the scraper
// should be called and used as the context timeout
// to ensure that scrapers don't exceed the interval.
CollectionInterval time.Duration `mapstructure:"collection_interval"`
// InitialDelay sets the initial start delay for the scraper,
// any non positive value is assumed to be immediately.
InitialDelay time.Duration `mapstructure:"initial_delay"`
// Timeout is an optional value used to set scraper's context deadline.
Timeout time.Duration `mapstructure:"timeout"`
}

// NewDefaultScraperControllerSettings returns default scraper controller
// settings with a collection interval of one minute.
func NewDefaultScraperControllerSettings(component.Type) ScraperControllerSettings {
return ScraperControllerSettings{
CollectionInterval: time.Minute,
InitialDelay: time.Second,
Timeout: 0,
}
}

func (set *ScraperControllerSettings) Validate() (errs error) {
if set.CollectionInterval <= 0 {
errs = multierr.Append(errs, fmt.Errorf(`"collection_interval": %w`, errNonPositiveInterval))
}
if set.Timeout < 0 {
errs = multierr.Append(errs, fmt.Errorf(`"timeout": %w`, errNonPositiveInterval))
}
return errs
}
52 changes: 52 additions & 0 deletions receiver/scraperhelper/settings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraperhelper

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestScrapeControllerSettings(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
set ScraperControllerSettings
errVal string
}{
{
name: "default configuration",
set: NewDefaultScraperControllerSettings(""),
errVal: "",
},
{
name: "zero value configuration",
set: ScraperControllerSettings{},
errVal: `"collection_interval": requires positive value`,
},
{
name: "invalid timeout",
set: ScraperControllerSettings{
CollectionInterval: time.Minute,
Timeout: -1 * time.Minute,
},
errVal: `"timeout": requires positive value`,
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

err := tc.set.Validate()
if tc.errVal == "" {
assert.NoError(t, err, "Must not error")
return
}
assert.EqualError(t, err, tc.errVal, "Must match the expected error")
})
}
}

0 comments on commit 38ba4d5

Please sign in to comment.