Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make configsource Watchable an optional interface #3792

Merged
merged 1 commit into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions config/experimental/configsource/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ var ErrSessionClosed = errors.New("parent session was closed")
// specific error must use errors.Is.
var ErrValueUpdated = errors.New("configuration must retrieve the updated value")

// ErrWatcherNotSupported is returned by WatchForUpdate functions when the configuration
// source can't watch for updates on the value.
// This error can be wrapped with additional information. Callers trying to identify this
// specific error must use errors.Is.
var ErrWatcherNotSupported = errors.New("value watcher is not supported")

// ConfigSource is the interface to be implemented by objects used by the collector
// to retrieve external configuration information.
type ConfigSource interface {
Expand Down Expand Up @@ -84,14 +78,13 @@ type Session interface {
type Retrieved interface {
// Value is the retrieved data that will be injected on the configuration.
Value() interface{}
}

// Watchable is an optional interface that Retrieved can implement if the given source
// supports monitoring for updates.
type Watchable interface {
// WatchForUpdate is used to monitor for updates on the retrieved value.
//
// If a watcher is not supported by the configuration store in general or for the specific
// retrieved value the WatchForUpdate must immediately return ErrWatcherNotSupported or
// an error wrapping it.
//
// When watching is supported WatchForUpdate must not return until one of the following happens:
// It must not return until one of the following happens:
//
// 1. An update is detected for the monitored value. In this case the function should
// return ErrValueUpdated or an error wrapping it.
Expand Down
17 changes: 7 additions & 10 deletions config/internal/configsource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type Manager struct {
// into the configuration.
sessions map[string]configsource.Session
// watchers keeps track of all WatchForUpdate functions for retrieved values.
watchers []func() error
watchers []configsource.Watchable
// watchersWG is used to ensure that Close waits for all WatchForUpdate calls
// to complete.
watchersWG sync.WaitGroup
Expand Down Expand Up @@ -220,18 +220,13 @@ func (m *Manager) WatchForUpdate() error {
doneCh := make(chan struct{})
defer close(doneCh)

for _, watcher := range m.watchers {
for i := range m.watchers {
watcher := m.watchers[i]
m.watchersWG.Add(1)
watcherFn := watcher
go func() {
defer m.watchersWG.Done()

err := watcherFn()
err := watcher.WatchForUpdate()
switch {
case errors.Is(err, configsource.ErrWatcherNotSupported):
// The watcher for the retrieved value is not supported, nothing to
// do, just exit from the goroutine.
return
case errors.Is(err, configsource.ErrSessionClosed):
// The Session from which this watcher was retrieved is being closed.
// There is no error to report, just exit from the goroutine.
Expand Down Expand Up @@ -356,7 +351,9 @@ func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.Co
return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err)
}

m.watchers = append(m.watchers, retrieved.WatchForUpdate)
if watcher, okWatcher := retrieved.(configsource.Watchable); okWatcher {
m.watchers = append(m.watchers, watcher)
}

return retrieved.Value(), nil
}
Expand Down
24 changes: 14 additions & 10 deletions config/internal/configsource/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,17 +602,17 @@ func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params
return nil, fmt.Errorf("no value for selector %q", selector)
}

watchForUpdateFn := func() error {
return configsource.ErrWatcherNotSupported
}

if entry.WatchForUpdateFn != nil {
watchForUpdateFn = entry.WatchForUpdateFn
return &watchableRetrieved{
retrieved: retrieved{
value: entry.Value,
},
watchForUpdateFn: entry.WatchForUpdateFn,
}, nil
}

return &retrieved{
value: entry.Value,
watchForUpdateFn: watchForUpdateFn,
value: entry.Value,
}, nil
}

Expand All @@ -625,8 +625,7 @@ func (t *testConfigSource) Close(context.Context) error {
}

type retrieved struct {
value interface{}
watchForUpdateFn func() error
value interface{}
}

var _ configsource.Retrieved = (*retrieved)(nil)
Expand All @@ -635,6 +634,11 @@ func (r *retrieved) Value() interface{} {
return r.value
}

func (r *retrieved) WatchForUpdate() error {
type watchableRetrieved struct {
retrieved
watchForUpdateFn func() error
}

func (r *watchableRetrieved) WatchForUpdate() error {
return r.watchForUpdateFn()
}