Skip to content

Commit

Permalink
Update code according to changes in core
Browse files Browse the repository at this point in the history
Adapt code to changes in core:
- config.experimental.configsource.Watchable interface open-telemetry/opentelemetry-collector#3792
- Rename CustomUnmarshable to Unmarshallable open-telemetry/opentelemetry-collector#3774
- Switched metrics from Value() to DoubleVal() open-telemetry/opentelemetry-collector#3740
  • Loading branch information
dmitryax committed Aug 18, 2021
1 parent b8ac117 commit ceaabc7
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 77 deletions.
35 changes: 25 additions & 10 deletions internal/configprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,14 @@ import (
"go.opentelemetry.io/collector/config/experimental/configsource"
)

// WatcherNotSupported is the a watcher function that always returns ErrWatcherNotSupported.
func WatcherNotSupported() error {
return configsource.ErrWatcherNotSupported
}

type retrieved struct {
value interface{}
watchForUpdateFn func() error
value interface{}
}

// NewRetrieved is a helper that implements the Retrieved interface.
func NewRetrieved(value interface{}, watchForUpdateFn func() error) configsource.Retrieved {
func NewRetrieved(value interface{}) configsource.Retrieved {
return &retrieved{
value,
watchForUpdateFn,
}
}

Expand All @@ -43,6 +36,28 @@ func (r *retrieved) Value() interface{} {
return r.value
}

func (r *retrieved) WatchForUpdate() error {
type watchableRetrieved struct {
retrieved
watchForUpdateFn func() error
}

// NewWatchableRetrieved is a helper that implements the Watchable interface.
func NewWatchableRetrieved(value interface{}, watchForUpdateFn func() error) configsource.Retrieved {
return &watchableRetrieved{
retrieved: retrieved{
value: value,
},
watchForUpdateFn: watchForUpdateFn,
}
}

var _ configsource.Watchable = (*watchableRetrieved)(nil)
var _ configsource.Retrieved = (*watchableRetrieved)(nil)

func (r *watchableRetrieved) Value() interface{} {
return r.retrieved.value
}

func (r *watchableRetrieved) WatchForUpdate() error {
return r.watchForUpdateFn()
}
16 changes: 7 additions & 9 deletions internal/configprovider/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ type Manager struct {
// is being closed.
closeCh chan struct{}
// watchers keeps track of all WatchForUpdate functions for retrieved values.
watchers []func() error
watchers []configsource.Watchable
// watchersWG is used to ensure that Close waits for all WatchForUpdate calls
// to complete.
watchersWG sync.WaitGroup
Expand Down Expand Up @@ -237,18 +237,14 @@ func (m *Manager) WatchForUpdate() error {
doneCh := make(chan struct{})
defer close(doneCh)

for _, watcher := range m.watchers {
for i := range m.watchers {
watcher := m.watchers[i]
m.watchersWG.Add(1)
watcherFn := watcher
go func() {
defer m.watchersWG.Done()
err := watcher.WatchForUpdate()

err := watcherFn()
switch {
case errors.Is(err, configsource.ErrWatcherNotSupported):
// The watcher for the retrieved value is not supported, nothing to
// do, just exit from the goroutine.
return
case errors.Is(err, configsource.ErrSessionClosed):
// The Session from which this watcher was retrieved is being closed.
// There is no error to report, just exit from the goroutine.
Expand Down Expand Up @@ -371,7 +367,9 @@ func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.Co
return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err)
}

m.watchers = append(m.watchers, retrieved.WatchForUpdate)
if watcher, okWatcher := retrieved.(configsource.Watchable); okWatcher {
m.watchers = append(m.watchers, watcher)
}

return retrieved.Value(), nil
}
Expand Down
14 changes: 7 additions & 7 deletions internal/configprovider/testconfigsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params
return nil, fmt.Errorf("no value for selector %q", selector)
}

watchForUpdateFn := func() error {
return configsource.ErrWatcherNotSupported
}

if entry.WatchForUpdateFn != nil {
watchForUpdateFn = entry.WatchForUpdateFn
return &watchableRetrieved{
retrieved: retrieved{
value: entry.Value,
},
watchForUpdateFn: entry.WatchForUpdateFn,
}, nil
}

return &retrieved{
value: entry.Value,
watchForUpdateFn: watchForUpdateFn,
value: entry.Value,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/configsource/envvarconfigsource/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (e *envVarSession) Retrieve(_ context.Context, selector string, params inte
value, ok := os.LookupEnv(selector)
if ok {
// Environment variable found, everything is done.
return configprovider.NewRetrieved(value, configprovider.WatcherNotSupported), nil
return configprovider.NewRetrieved(value), nil
}

defaultValue, ok := e.defaults[selector]
Expand All @@ -70,7 +70,7 @@ func (e *envVarSession) Retrieve(_ context.Context, selector string, params inte
}
}

return configprovider.NewRetrieved(defaultValue, configprovider.WatcherNotSupported), nil
return configprovider.NewRetrieved(defaultValue), nil
}

func (e *envVarSession) RetrieveEnd(context.Context) error {
Expand Down
3 changes: 2 additions & 1 deletion internal/configsource/envvarconfigsource/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func TestEnvVarConfigSource_Session(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, r)
assert.Equal(t, tt.expected, r.Value())
assert.Equal(t, configsource.ErrWatcherNotSupported, r.WatchForUpdate())
_, ok := r.(configsource.Watchable)
assert.False(t, ok)
})
}
}
2 changes: 1 addition & 1 deletion internal/configsource/etcd2configsource/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *etcd2Session) Retrieve(ctx context.Context, selector string, _ interfac
watchCtx, cancel := context.WithCancel(context.Background())
s.closeFuncs = append(s.closeFuncs, cancel)

return configprovider.NewRetrieved(resp.Node.Value, s.newWatcher(watchCtx, selector, resp.Node.ModifiedIndex)), nil
return configprovider.NewWatchableRetrieved(resp.Node.Value, s.newWatcher(watchCtx, selector, resp.Node.ModifiedIndex)), nil
}

func (s *etcd2Session) RetrieveEnd(context.Context) error {
Expand Down
8 changes: 5 additions & 3 deletions internal/configsource/etcd2configsource/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestSessionRetrieve(t *testing.T) {
retrieved, err := session.Retrieve(context.Background(), c.key, nil)
if c.expect != nil {
assert.NoError(t, err)
assert.NotNil(t, retrieved.WatchForUpdate)
_, okWatcher := retrieved.(configsource.Watchable)
assert.True(t, okWatcher)
return
}
assert.Error(t, err)
Expand Down Expand Up @@ -90,7 +91,8 @@ func TestWatcher(t *testing.T) {
retrieved, err := session.Retrieve(context.Background(), "k1", nil)
assert.NoError(t, err)
assert.NotNil(t, retrieved.Value)
assert.NotNil(t, retrieved.WatchForUpdate)
retrievedWatcher, okWatcher := retrieved.(configsource.Watchable)
assert.True(t, okWatcher)
assert.False(t, watcher.closed)

go func() {
Expand All @@ -104,7 +106,7 @@ func TestWatcher(t *testing.T) {
}
}()

err = retrieved.WatchForUpdate()
err = retrievedWatcher.WatchForUpdate()

switch {
case c.close:
Expand Down
9 changes: 6 additions & 3 deletions internal/configsource/includeconfigsource/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ func (is *includeSession) Retrieve(_ context.Context, selector string, params in
}

if !is.WatchFiles {
return configprovider.NewRetrieved(buf.Bytes(), configprovider.WatcherNotSupported), nil
return configprovider.NewRetrieved(buf.Bytes()), nil
}

watchForUpdateFn, err := is.watchFile(selector)
if err != nil {
return nil, err
}

return configprovider.NewRetrieved(buf.Bytes(), watchForUpdateFn), nil
if watchForUpdateFn == nil {
return configprovider.NewRetrieved(buf.Bytes()), nil
}
return configprovider.NewWatchableRetrieved(buf.Bytes(), watchForUpdateFn), nil
}

func (is *includeSession) RetrieveEnd(context.Context) error {
Expand All @@ -91,7 +94,7 @@ func newSession(config Config) *includeSession {
}

func (is *includeSession) watchFile(file string) (func() error, error) {
watchForUpdateFn := configprovider.WatcherNotSupported
var watchForUpdateFn func() error
if _, watched := is.watchedFiles[file]; watched {
// This file is already watched another watch function is not needed.
return watchForUpdateFn, nil
Expand Down
3 changes: 2 additions & 1 deletion internal/configsource/includeconfigsource/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestIncludeConfigSource_DeleteFile(t *testing.T) {
require.NotNil(t, r)
assert.Equal(t, []byte("42"), r.Value())

assert.Equal(t, configsource.ErrWatcherNotSupported, r.WatchForUpdate())
_, ok := r.(configsource.Watchable)
assert.False(t, ok)
}

func TestIncludeConfigSource_DeleteFileError(t *testing.T) {
Expand Down
17 changes: 8 additions & 9 deletions internal/configsource/vaultconfigsource/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var _ configsource.Session = (*vaultSession)(nil)
func (v *vaultSession) Retrieve(_ context.Context, selector string, _ interface{}) (configsource.Retrieved, error) {
// By default assume that watcher is not supported. The exception will be the first
// value read from the vault secret.
watchForUpdateFn := watcherNotSupported
var watchForUpdateFn func() error

if v.secret == nil {
if err := v.readSecret(); err != nil {
Expand All @@ -79,7 +79,10 @@ func (v *vaultSession) Retrieve(_ context.Context, selector string, _ interface{
return nil, &errBadSelector{fmt.Errorf("no value at path %q for key %q", v.path, selector)}
}

return configprovider.NewRetrieved(value, watchForUpdateFn), nil
if watchForUpdateFn == nil {
return configprovider.NewRetrieved(value), nil
}
return configprovider.NewWatchableRetrieved(value, watchForUpdateFn), nil
}

func (v *vaultSession) RetrieveEnd(context.Context) error {
Expand Down Expand Up @@ -211,19 +214,19 @@ func (v *vaultSession) buildPollingWatcher() (func() error, error) {
mdValue := v.secret.Data["metadata"]
if mdValue == nil || !strings.Contains(v.path, "/data/") {
v.logger.Warn("Missing metadata to create polling watcher for vault config source", zap.String("path", v.path))
return watcherNotSupported, nil
return nil, nil
}

mdMap, ok := mdValue.(map[string]interface{})
if !ok {
v.logger.Warn("Metadata not in the expected format to create polling watcher for vault config source", zap.String("path", v.path))
return watcherNotSupported, nil
return nil, nil
}

originalVersion := v.extractVersionMetadata(mdMap, "created_time", "version")
if originalVersion == nil {
v.logger.Warn("Failed to extract version metadata to create to create polling watcher for vault config source", zap.String("path", v.path))
return watcherNotSupported, nil
return nil, nil
}

watcherFn := func() error {
Expand Down Expand Up @@ -316,7 +319,3 @@ func traverseToKey(data map[string]interface{}, key string) interface{} {
}
}
}

func watcherNotSupported() error {
return configsource.ErrWatcherNotSupported
}
Loading

0 comments on commit ceaabc7

Please sign in to comment.