Skip to content

Commit

Permalink
Make configsource Watchable an optional interface
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 7, 2021
1 parent 3a69fa6 commit 685ea29
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 33 deletions.
17 changes: 5 additions & 12 deletions config/experimental/configsource/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ var ErrSessionClosed = errors.New("parent session was closed")
// specific error must use errors.Is.
var ErrValueUpdated = errors.New("configuration must retrieve the updated value")

// ErrWatcherNotSupported is returned by WatchForUpdate functions when the configuration
// source can't watch for updates on the value.
// This error can be wrapped with additional information. Callers trying to identify this
// specific error must use errors.Is.
var ErrWatcherNotSupported = errors.New("value watcher is not supported")

// ConfigSource is the interface to be implemented by objects used by the collector
// to retrieve external configuration information.
type ConfigSource interface {
Expand Down Expand Up @@ -84,14 +78,13 @@ type Session interface {
type Retrieved interface {
// Value is the retrieved data that will be injected on the configuration.
Value() interface{}
}

// Watchable is an optional interface that Retrieved can implement if the given source
// supports monitoring for updates.
type Watchable interface {
// WatchForUpdate is used to monitor for updates on the retrieved value.
//
// If a watcher is not supported by the configuration store in general or for the specific
// retrieved value the WatchForUpdate must immediately return ErrWatcherNotSupported or
// an error wrapping it.
//
// When watching is supported WatchForUpdate must not return until one of the following happens:
// It must not return until one of the following happens:
//
// 1. An update is detected for the monitored value. In this case the function should
// return ErrValueUpdated or an error wrapping it.
Expand Down
17 changes: 7 additions & 10 deletions config/internal/configsource/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type Manager struct {
// into the configuration.
sessions map[string]configsource.Session
// watchers keeps track of all WatchForUpdate functions for retrieved values.
watchers []func() error
watchers []configsource.Watchable
// watchersWG is used to ensure that Close waits for all WatchForUpdate calls
// to complete.
watchersWG sync.WaitGroup
Expand Down Expand Up @@ -220,18 +220,13 @@ func (m *Manager) WatchForUpdate() error {
doneCh := make(chan struct{})
defer close(doneCh)

for _, watcher := range m.watchers {
for i := range m.watchers {
watcher := m.watchers[i]
m.watchersWG.Add(1)
watcherFn := watcher
go func() {
defer m.watchersWG.Done()

err := watcherFn()
err := watcher.WatchForUpdate()
switch {
case errors.Is(err, configsource.ErrWatcherNotSupported):
// The watcher for the retrieved value is not supported, nothing to
// do, just exit from the goroutine.
return
case errors.Is(err, configsource.ErrSessionClosed):
// The Session from which this watcher was retrieved is being closed.
// There is no error to report, just exit from the goroutine.
Expand Down Expand Up @@ -356,7 +351,9 @@ func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.Co
return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err)
}

m.watchers = append(m.watchers, retrieved.WatchForUpdate)
if watcher, okWatcher := retrieved.(configsource.Watchable); okWatcher {
m.watchers = append(m.watchers, watcher)
}

return retrieved.Value(), nil
}
Expand Down
24 changes: 14 additions & 10 deletions config/internal/configsource/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,17 +602,17 @@ func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params
return nil, fmt.Errorf("no value for selector %q", selector)
}

watchForUpdateFn := func() error {
return configsource.ErrWatcherNotSupported
}

if entry.WatchForUpdateFn != nil {
watchForUpdateFn = entry.WatchForUpdateFn
return &watchableRetrieved{
retrieved: retrieved{
value: entry.Value,
},
watchForUpdateFn: entry.WatchForUpdateFn,
}, nil
}

return &retrieved{
value: entry.Value,
watchForUpdateFn: watchForUpdateFn,
value: entry.Value,
}, nil
}

Expand All @@ -625,8 +625,7 @@ func (t *testConfigSource) Close(context.Context) error {
}

type retrieved struct {
value interface{}
watchForUpdateFn func() error
value interface{}
}

var _ configsource.Retrieved = (*retrieved)(nil)
Expand All @@ -635,6 +634,11 @@ func (r *retrieved) Value() interface{} {
return r.value
}

func (r *retrieved) WatchForUpdate() error {
type watchableRetrieved struct {
retrieved
watchForUpdateFn func() error
}

func (r *watchableRetrieved) WatchForUpdate() error {
return r.watchForUpdateFn()
}
2 changes: 1 addition & 1 deletion service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return
default:
col.logger.Warn("Config WatchForUpdated exited", zap.Error(err))
if err := col.reloadService(context.Background()); err != nil {
if err = col.reloadService(context.Background()); err != nil {
col.asyncErrorChannel <- err
}
}
Expand Down

0 comments on commit 685ea29

Please sign in to comment.