From 34233e153f7469db2a9fcaf2136f663a7197345b Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 6 Aug 2021 17:47:49 -0700 Subject: [PATCH] Make configsource Watchable an optional interface Signed-off-by: Bogdan Drutu --- config/experimental/configsource/component.go | 17 ++++--------- config/internal/configsource/manager.go | 17 ++++++------- config/internal/configsource/manager_test.go | 24 +++++++++++-------- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/config/experimental/configsource/component.go b/config/experimental/configsource/component.go index c65ea9bda5c..257dfa8f191 100644 --- a/config/experimental/configsource/component.go +++ b/config/experimental/configsource/component.go @@ -31,12 +31,6 @@ var ErrSessionClosed = errors.New("parent session was closed") // specific error must use errors.Is. var ErrValueUpdated = errors.New("configuration must retrieve the updated value") -// ErrWatcherNotSupported is returned by WatchForUpdate functions when the configuration -// source can't watch for updates on the value. -// This error can be wrapped with additional information. Callers trying to identify this -// specific error must use errors.Is. -var ErrWatcherNotSupported = errors.New("value watcher is not supported") - // ConfigSource is the interface to be implemented by objects used by the collector // to retrieve external configuration information. type ConfigSource interface { @@ -84,14 +78,13 @@ type Session interface { type Retrieved interface { // Value is the retrieved data that will be injected on the configuration. Value() interface{} +} +// Watchable is an optional interface that Retrieved can implement if the given source +// supports monitoring for updates. +type Watchable interface { // WatchForUpdate is used to monitor for updates on the retrieved value. - // - // If a watcher is not supported by the configuration store in general or for the specific - // retrieved value the WatchForUpdate must immediately return ErrWatcherNotSupported or - // an error wrapping it. - // - // When watching is supported WatchForUpdate must not return until one of the following happens: + // It must not return until one of the following happens: // // 1. An update is detected for the monitored value. In this case the function should // return ErrValueUpdated or an error wrapping it. diff --git a/config/internal/configsource/manager.go b/config/internal/configsource/manager.go index ddcf8bf9909..eddcdf82ade 100644 --- a/config/internal/configsource/manager.go +++ b/config/internal/configsource/manager.go @@ -161,7 +161,7 @@ type Manager struct { // into the configuration. sessions map[string]configsource.Session // watchers keeps track of all WatchForUpdate functions for retrieved values. - watchers []func() error + watchers []configsource.Watchable // watchersWG is used to ensure that Close waits for all WatchForUpdate calls // to complete. watchersWG sync.WaitGroup @@ -220,18 +220,13 @@ func (m *Manager) WatchForUpdate() error { doneCh := make(chan struct{}) defer close(doneCh) - for _, watcher := range m.watchers { + for i := range m.watchers { + watcher := m.watchers[i] m.watchersWG.Add(1) - watcherFn := watcher go func() { defer m.watchersWG.Done() - - err := watcherFn() + err := watcher.WatchForUpdate() switch { - case errors.Is(err, configsource.ErrWatcherNotSupported): - // The watcher for the retrieved value is not supported, nothing to - // do, just exit from the goroutine. - return case errors.Is(err, configsource.ErrSessionClosed): // The Session from which this watcher was retrieved is being closed. // There is no error to report, just exit from the goroutine. @@ -356,7 +351,9 @@ func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.Co return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err) } - m.watchers = append(m.watchers, retrieved.WatchForUpdate) + if watcher, okWatcher := retrieved.(configsource.Watchable); okWatcher { + m.watchers = append(m.watchers, watcher) + } return retrieved.Value(), nil } diff --git a/config/internal/configsource/manager_test.go b/config/internal/configsource/manager_test.go index 455ab270542..9030a872a70 100644 --- a/config/internal/configsource/manager_test.go +++ b/config/internal/configsource/manager_test.go @@ -602,17 +602,17 @@ func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params return nil, fmt.Errorf("no value for selector %q", selector) } - watchForUpdateFn := func() error { - return configsource.ErrWatcherNotSupported - } - if entry.WatchForUpdateFn != nil { - watchForUpdateFn = entry.WatchForUpdateFn + return &watchableRetrieved{ + retrieved: retrieved{ + value: entry.Value, + }, + watchForUpdateFn: entry.WatchForUpdateFn, + }, nil } return &retrieved{ - value: entry.Value, - watchForUpdateFn: watchForUpdateFn, + value: entry.Value, }, nil } @@ -625,8 +625,7 @@ func (t *testConfigSource) Close(context.Context) error { } type retrieved struct { - value interface{} - watchForUpdateFn func() error + value interface{} } var _ configsource.Retrieved = (*retrieved)(nil) @@ -635,6 +634,11 @@ func (r *retrieved) Value() interface{} { return r.value } -func (r *retrieved) WatchForUpdate() error { +type watchableRetrieved struct { + retrieved + watchForUpdateFn func() error +} + +func (r *watchableRetrieved) WatchForUpdate() error { return r.watchForUpdateFn() }