From ceaabc7456dc21e1390d4d33595828b6249400d0 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 17 Aug 2021 22:30:34 -0700 Subject: [PATCH] Update code according to changes in core Adapt code to changes in core: - config.experimental.configsource.Watchable interface https://github.com/open-telemetry/opentelemetry-collector/pull/3792 - Rename CustomUnmarshable to Unmarshallable https://github.com/open-telemetry/opentelemetry-collector/pull/3774 - Switched metrics from Value() to DoubleVal() https://github.com/open-telemetry/opentelemetry-collector/pull/3740 --- internal/configprovider/helpers.go | 35 ++++++++---- internal/configprovider/manager.go | 16 +++--- .../configprovider/testconfigsource_test.go | 14 ++--- .../envvarconfigsource/session.go | 4 +- .../envvarconfigsource/session_test.go | 3 +- .../configsource/etcd2configsource/session.go | 2 +- .../etcd2configsource/session_test.go | 8 +-- .../includeconfigsource/session.go | 9 ++-- .../includeconfigsource/session_test.go | 3 +- .../configsource/vaultconfigsource/session.go | 17 +++--- .../vaultconfigsource/session_test.go | 53 ++++++++++++------- .../zookeeperconfigsource/session.go | 2 +- .../zookeeperconfigsource/session_test.go | 8 +-- .../extension/smartagentextension/config.go | 2 +- .../receiver/smartagentreceiver/config.go | 2 +- .../smartagentreceiver/receiver_test.go | 10 ++-- 16 files changed, 111 insertions(+), 77 deletions(-) diff --git a/internal/configprovider/helpers.go b/internal/configprovider/helpers.go index 8704444f57..6783cb0cb1 100644 --- a/internal/configprovider/helpers.go +++ b/internal/configprovider/helpers.go @@ -19,21 +19,14 @@ import ( "go.opentelemetry.io/collector/config/experimental/configsource" ) -// WatcherNotSupported is the a watcher function that always returns ErrWatcherNotSupported. -func WatcherNotSupported() error { - return configsource.ErrWatcherNotSupported -} - type retrieved struct { - value interface{} - watchForUpdateFn func() error + value interface{} } // NewRetrieved is a helper that implements the Retrieved interface. -func NewRetrieved(value interface{}, watchForUpdateFn func() error) configsource.Retrieved { +func NewRetrieved(value interface{}) configsource.Retrieved { return &retrieved{ value, - watchForUpdateFn, } } @@ -43,6 +36,28 @@ func (r *retrieved) Value() interface{} { return r.value } -func (r *retrieved) WatchForUpdate() error { +type watchableRetrieved struct { + retrieved + watchForUpdateFn func() error +} + +// NewWatchableRetrieved is a helper that implements the Watchable interface. +func NewWatchableRetrieved(value interface{}, watchForUpdateFn func() error) configsource.Retrieved { + return &watchableRetrieved{ + retrieved: retrieved{ + value: value, + }, + watchForUpdateFn: watchForUpdateFn, + } +} + +var _ configsource.Watchable = (*watchableRetrieved)(nil) +var _ configsource.Retrieved = (*watchableRetrieved)(nil) + +func (r *watchableRetrieved) Value() interface{} { + return r.retrieved.value +} + +func (r *watchableRetrieved) WatchForUpdate() error { return r.watchForUpdateFn() } diff --git a/internal/configprovider/manager.go b/internal/configprovider/manager.go index 9efeedfccf..70c473cba8 100644 --- a/internal/configprovider/manager.go +++ b/internal/configprovider/manager.go @@ -172,7 +172,7 @@ type Manager struct { // is being closed. closeCh chan struct{} // watchers keeps track of all WatchForUpdate functions for retrieved values. - watchers []func() error + watchers []configsource.Watchable // watchersWG is used to ensure that Close waits for all WatchForUpdate calls // to complete. watchersWG sync.WaitGroup @@ -237,18 +237,14 @@ func (m *Manager) WatchForUpdate() error { doneCh := make(chan struct{}) defer close(doneCh) - for _, watcher := range m.watchers { + for i := range m.watchers { + watcher := m.watchers[i] m.watchersWG.Add(1) - watcherFn := watcher go func() { defer m.watchersWG.Done() + err := watcher.WatchForUpdate() - err := watcherFn() switch { - case errors.Is(err, configsource.ErrWatcherNotSupported): - // The watcher for the retrieved value is not supported, nothing to - // do, just exit from the goroutine. - return case errors.Is(err, configsource.ErrSessionClosed): // The Session from which this watcher was retrieved is being closed. // There is no error to report, just exit from the goroutine. @@ -371,7 +367,9 @@ func (m *Manager) expandConfigSource(ctx context.Context, cfgSrc configsource.Co return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err) } - m.watchers = append(m.watchers, retrieved.WatchForUpdate) + if watcher, okWatcher := retrieved.(configsource.Watchable); okWatcher { + m.watchers = append(m.watchers, watcher) + } return retrieved.Value(), nil } diff --git a/internal/configprovider/testconfigsource_test.go b/internal/configprovider/testconfigsource_test.go index d5800901a9..71514502b7 100644 --- a/internal/configprovider/testconfigsource_test.go +++ b/internal/configprovider/testconfigsource_test.go @@ -65,17 +65,17 @@ func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params return nil, fmt.Errorf("no value for selector %q", selector) } - watchForUpdateFn := func() error { - return configsource.ErrWatcherNotSupported - } - if entry.WatchForUpdateFn != nil { - watchForUpdateFn = entry.WatchForUpdateFn + return &watchableRetrieved{ + retrieved: retrieved{ + value: entry.Value, + }, + watchForUpdateFn: entry.WatchForUpdateFn, + }, nil } return &retrieved{ - value: entry.Value, - watchForUpdateFn: watchForUpdateFn, + value: entry.Value, }, nil } diff --git a/internal/configsource/envvarconfigsource/session.go b/internal/configsource/envvarconfigsource/session.go index 9a722539ad..8fac464bf4 100644 --- a/internal/configsource/envvarconfigsource/session.go +++ b/internal/configsource/envvarconfigsource/session.go @@ -60,7 +60,7 @@ func (e *envVarSession) Retrieve(_ context.Context, selector string, params inte value, ok := os.LookupEnv(selector) if ok { // Environment variable found, everything is done. - return configprovider.NewRetrieved(value, configprovider.WatcherNotSupported), nil + return configprovider.NewRetrieved(value), nil } defaultValue, ok := e.defaults[selector] @@ -70,7 +70,7 @@ func (e *envVarSession) Retrieve(_ context.Context, selector string, params inte } } - return configprovider.NewRetrieved(defaultValue, configprovider.WatcherNotSupported), nil + return configprovider.NewRetrieved(defaultValue), nil } func (e *envVarSession) RetrieveEnd(context.Context) error { diff --git a/internal/configsource/envvarconfigsource/session_test.go b/internal/configsource/envvarconfigsource/session_test.go index 7708214b35..260d65b096 100644 --- a/internal/configsource/envvarconfigsource/session_test.go +++ b/internal/configsource/envvarconfigsource/session_test.go @@ -103,7 +103,8 @@ func TestEnvVarConfigSource_Session(t *testing.T) { require.NoError(t, err) require.NotNil(t, r) assert.Equal(t, tt.expected, r.Value()) - assert.Equal(t, configsource.ErrWatcherNotSupported, r.WatchForUpdate()) + _, ok := r.(configsource.Watchable) + assert.False(t, ok) }) } } diff --git a/internal/configsource/etcd2configsource/session.go b/internal/configsource/etcd2configsource/session.go index c7b951c517..c18551b203 100644 --- a/internal/configsource/etcd2configsource/session.go +++ b/internal/configsource/etcd2configsource/session.go @@ -55,7 +55,7 @@ func (s *etcd2Session) Retrieve(ctx context.Context, selector string, _ interfac watchCtx, cancel := context.WithCancel(context.Background()) s.closeFuncs = append(s.closeFuncs, cancel) - return configprovider.NewRetrieved(resp.Node.Value, s.newWatcher(watchCtx, selector, resp.Node.ModifiedIndex)), nil + return configprovider.NewWatchableRetrieved(resp.Node.Value, s.newWatcher(watchCtx, selector, resp.Node.ModifiedIndex)), nil } func (s *etcd2Session) RetrieveEnd(context.Context) error { diff --git a/internal/configsource/etcd2configsource/session_test.go b/internal/configsource/etcd2configsource/session_test.go index 0df845c3cf..38d4183b5b 100644 --- a/internal/configsource/etcd2configsource/session_test.go +++ b/internal/configsource/etcd2configsource/session_test.go @@ -55,7 +55,8 @@ func TestSessionRetrieve(t *testing.T) { retrieved, err := session.Retrieve(context.Background(), c.key, nil) if c.expect != nil { assert.NoError(t, err) - assert.NotNil(t, retrieved.WatchForUpdate) + _, okWatcher := retrieved.(configsource.Watchable) + assert.True(t, okWatcher) return } assert.Error(t, err) @@ -90,7 +91,8 @@ func TestWatcher(t *testing.T) { retrieved, err := session.Retrieve(context.Background(), "k1", nil) assert.NoError(t, err) assert.NotNil(t, retrieved.Value) - assert.NotNil(t, retrieved.WatchForUpdate) + retrievedWatcher, okWatcher := retrieved.(configsource.Watchable) + assert.True(t, okWatcher) assert.False(t, watcher.closed) go func() { @@ -104,7 +106,7 @@ func TestWatcher(t *testing.T) { } }() - err = retrieved.WatchForUpdate() + err = retrievedWatcher.WatchForUpdate() switch { case c.close: diff --git a/internal/configsource/includeconfigsource/session.go b/internal/configsource/includeconfigsource/session.go index cee9290149..4544f4a487 100644 --- a/internal/configsource/includeconfigsource/session.go +++ b/internal/configsource/includeconfigsource/session.go @@ -60,7 +60,7 @@ func (is *includeSession) Retrieve(_ context.Context, selector string, params in } if !is.WatchFiles { - return configprovider.NewRetrieved(buf.Bytes(), configprovider.WatcherNotSupported), nil + return configprovider.NewRetrieved(buf.Bytes()), nil } watchForUpdateFn, err := is.watchFile(selector) @@ -68,7 +68,10 @@ func (is *includeSession) Retrieve(_ context.Context, selector string, params in return nil, err } - return configprovider.NewRetrieved(buf.Bytes(), watchForUpdateFn), nil + if watchForUpdateFn == nil { + return configprovider.NewRetrieved(buf.Bytes()), nil + } + return configprovider.NewWatchableRetrieved(buf.Bytes(), watchForUpdateFn), nil } func (is *includeSession) RetrieveEnd(context.Context) error { @@ -91,7 +94,7 @@ func newSession(config Config) *includeSession { } func (is *includeSession) watchFile(file string) (func() error, error) { - watchForUpdateFn := configprovider.WatcherNotSupported + var watchForUpdateFn func() error if _, watched := is.watchedFiles[file]; watched { // This file is already watched another watch function is not needed. return watchForUpdateFn, nil diff --git a/internal/configsource/includeconfigsource/session_test.go b/internal/configsource/includeconfigsource/session_test.go index 1632c93e75..1c9cb2b87e 100644 --- a/internal/configsource/includeconfigsource/session_test.go +++ b/internal/configsource/includeconfigsource/session_test.go @@ -107,7 +107,8 @@ func TestIncludeConfigSource_DeleteFile(t *testing.T) { require.NotNil(t, r) assert.Equal(t, []byte("42"), r.Value()) - assert.Equal(t, configsource.ErrWatcherNotSupported, r.WatchForUpdate()) + _, ok := r.(configsource.Watchable) + assert.False(t, ok) } func TestIncludeConfigSource_DeleteFileError(t *testing.T) { diff --git a/internal/configsource/vaultconfigsource/session.go b/internal/configsource/vaultconfigsource/session.go index 9372eb371a..7c3bf3788c 100644 --- a/internal/configsource/vaultconfigsource/session.go +++ b/internal/configsource/vaultconfigsource/session.go @@ -58,7 +58,7 @@ var _ configsource.Session = (*vaultSession)(nil) func (v *vaultSession) Retrieve(_ context.Context, selector string, _ interface{}) (configsource.Retrieved, error) { // By default assume that watcher is not supported. The exception will be the first // value read from the vault secret. - watchForUpdateFn := watcherNotSupported + var watchForUpdateFn func() error if v.secret == nil { if err := v.readSecret(); err != nil { @@ -79,7 +79,10 @@ func (v *vaultSession) Retrieve(_ context.Context, selector string, _ interface{ return nil, &errBadSelector{fmt.Errorf("no value at path %q for key %q", v.path, selector)} } - return configprovider.NewRetrieved(value, watchForUpdateFn), nil + if watchForUpdateFn == nil { + return configprovider.NewRetrieved(value), nil + } + return configprovider.NewWatchableRetrieved(value, watchForUpdateFn), nil } func (v *vaultSession) RetrieveEnd(context.Context) error { @@ -211,19 +214,19 @@ func (v *vaultSession) buildPollingWatcher() (func() error, error) { mdValue := v.secret.Data["metadata"] if mdValue == nil || !strings.Contains(v.path, "/data/") { v.logger.Warn("Missing metadata to create polling watcher for vault config source", zap.String("path", v.path)) - return watcherNotSupported, nil + return nil, nil } mdMap, ok := mdValue.(map[string]interface{}) if !ok { v.logger.Warn("Metadata not in the expected format to create polling watcher for vault config source", zap.String("path", v.path)) - return watcherNotSupported, nil + return nil, nil } originalVersion := v.extractVersionMetadata(mdMap, "created_time", "version") if originalVersion == nil { v.logger.Warn("Failed to extract version metadata to create to create polling watcher for vault config source", zap.String("path", v.path)) - return watcherNotSupported, nil + return nil, nil } watcherFn := func() error { @@ -316,7 +319,3 @@ func traverseToKey(data map[string]interface{}, key string) interface{} { } } } - -func watcherNotSupported() error { - return configsource.ErrWatcherNotSupported -} diff --git a/internal/configsource/vaultconfigsource/session_test.go b/internal/configsource/vaultconfigsource/session_test.go index fbaf9b1c1a..10ef06e8e5 100644 --- a/internal/configsource/vaultconfigsource/session_test.go +++ b/internal/configsource/vaultconfigsource/session_test.go @@ -93,11 +93,14 @@ func TestVaultSessionForKV(t *testing.T) { require.NoError(t, s.RetrieveEnd(context.Background())) + watcher, ok := retrieved.(configsource.Watchable) + require.True(t, ok) + var watcherErr error doneCh := make(chan struct{}) go func() { defer close(doneCh) - watcherErr = retrieved.WatchForUpdate() + watcherErr = watcher.WatchForUpdate() }() require.NoError(t, s.Close(context.Background())) @@ -142,15 +145,19 @@ func TestVaultPollingKVUpdate(t *testing.T) { // RetrieveEnd require.NoError(t, s.RetrieveEnd(context.Background())) + watcherK0, ok := retrievedK0.(configsource.Watchable) + require.True(t, ok) + // Only the first retrieved key provides a working watcher. - require.Equal(t, configsource.ErrWatcherNotSupported, retrievedK1.WatchForUpdate()) + _, ok = retrievedK1.(configsource.Watchable) + require.False(t, ok) var watcherErr error var doneCh chan struct{} doneCh = make(chan struct{}) go func() { defer close(doneCh) - watcherErr = retrievedK0.WatchForUpdate() + watcherErr = watcherK0.WatchForUpdate() }() requireCmdRun(t, updateKVStore) @@ -172,11 +179,14 @@ func TestVaultPollingKVUpdate(t *testing.T) { require.NoError(t, err) require.Equal(t, "v1.1", retrievedUpdatedK1.Value().(string)) + watcherUpdatedK1, ok := retrievedUpdatedK1.(configsource.Watchable) + require.True(t, ok) + // Wait for close. doneCh = make(chan struct{}) go func() { defer close(doneCh) - watcherErr = retrievedUpdatedK1.WatchForUpdate() + watcherErr = watcherUpdatedK1.WatchForUpdate() }() require.NoError(t, s.Close(context.Background())) @@ -223,10 +233,14 @@ func TestVaultRenewableSecret(t *testing.T) { // RetrieveEnd require.NoError(t, s.RetrieveEnd(context.Background())) + watcherUser, ok := retrievedUser.(configsource.Watchable) + require.True(t, ok) + // Only the first retrieved key provides a working watcher. - require.Equal(t, configsource.ErrWatcherNotSupported, retrievedPwd.WatchForUpdate()) + _, ok = retrievedPwd.(configsource.Watchable) + require.False(t, ok) - watcherErr := retrievedUser.WatchForUpdate() + watcherErr := watcherUser.WatchForUpdate() require.ErrorIs(t, watcherErr, configsource.ErrValueUpdated) // Close current session. @@ -247,11 +261,14 @@ func TestVaultRenewableSecret(t *testing.T) { require.NoError(t, err) require.NotEqual(t, retrievedPwd.Value(), retrievedUpdatedPwd.Value()) + watcherUpdatedPwd, ok := retrievedUpdatedPwd.(configsource.Watchable) + require.True(t, ok) + // Wait for close. doneCh := make(chan struct{}) go func() { defer close(doneCh) - watcherErr = retrievedUpdatedUser.WatchForUpdate() + watcherErr = watcherUpdatedPwd.WatchForUpdate() }() runtime.Gosched() @@ -293,12 +310,15 @@ func TestVaultV1SecretWithTTL(t *testing.T) { // RetrieveEnd require.NoError(t, s.RetrieveEnd(context.Background())) + watcher, ok := retrievedValue.(configsource.Watchable) + require.True(t, ok) + var watcherErr error var doneCh chan struct{} doneCh = make(chan struct{}) go func() { defer close(doneCh) - watcherErr = retrievedValue.WatchForUpdate() + watcherErr = watcher.WatchForUpdate() }() // Wait for update. @@ -318,11 +338,14 @@ func TestVaultV1SecretWithTTL(t *testing.T) { require.NoError(t, err) require.Equal(t, "s3cr3t", retrievedValue.Value().(string)) + watcher, ok = retrievedValue.(configsource.Watchable) + require.True(t, ok) + // Wait for close. doneCh = make(chan struct{}) go func() { defer close(doneCh) - watcherErr = retrievedValue.WatchForUpdate() + watcherErr = watcher.WatchForUpdate() }() require.NoError(t, s.Close(context.Background())) @@ -363,16 +386,8 @@ func TestVaultV1NonWatchableSecret(t *testing.T) { // RetrieveEnd require.NoError(t, s.RetrieveEnd(context.Background())) - var watcherErr error - doneCh := make(chan struct{}) - go func() { - defer close(doneCh) - watcherErr = retrievedValue.WatchForUpdate() - }() - - // Wait for update. - <-doneCh - require.ErrorIs(t, watcherErr, configsource.ErrWatcherNotSupported) + _, ok := retrievedValue.(configsource.Watchable) + require.False(t, ok) // Close current session. require.NoError(t, s.Close(context.Background())) diff --git a/internal/configsource/zookeeperconfigsource/session.go b/internal/configsource/zookeeperconfigsource/session.go index be47a5ac93..101b1bc22c 100644 --- a/internal/configsource/zookeeperconfigsource/session.go +++ b/internal/configsource/zookeeperconfigsource/session.go @@ -53,7 +53,7 @@ func (s *zkSession) Retrieve(ctx context.Context, selector string, _ interface{} return nil, err } - return configprovider.NewRetrieved(value, newWatcher(watchCh, s.closeCh)), nil + return configprovider.NewWatchableRetrieved(value, newWatcher(watchCh, s.closeCh)), nil } func (s *zkSession) RetrieveEnd(context.Context) error { diff --git a/internal/configsource/zookeeperconfigsource/session_test.go b/internal/configsource/zookeeperconfigsource/session_test.go index 639b9919f0..dc7ad1c9d6 100644 --- a/internal/configsource/zookeeperconfigsource/session_test.go +++ b/internal/configsource/zookeeperconfigsource/session_test.go @@ -54,7 +54,8 @@ func TestSessionRetrieve(t *testing.T) { retrieved, err := session.Retrieve(context.Background(), c.key, nil) if c.expect != nil { assert.NoError(t, err) - assert.NotNil(t, retrieved.WatchForUpdate) + _, okWatcher := retrieved.(configsource.Watchable) + assert.True(t, okWatcher) return } assert.Error(t, err) @@ -89,7 +90,8 @@ func TestWatcher(t *testing.T) { retrieved, err := session.Retrieve(context.Background(), "k1", nil) assert.NoError(t, err) assert.NotNil(t, retrieved.Value) - assert.NotNil(t, retrieved.WatchForUpdate) + retrievedWatcher, okWatcher := retrieved.(configsource.Watchable) + assert.True(t, okWatcher) assert.Contains(t, conn.watches, "k1") watcher := conn.watches["k1"] @@ -108,7 +110,7 @@ func TestWatcher(t *testing.T) { } }() - err = retrieved.WatchForUpdate() + err = retrievedWatcher.WatchForUpdate() switch { case c.close: assert.ErrorIs(t, err, configsource.ErrSessionClosed) diff --git a/internal/extension/smartagentextension/config.go b/internal/extension/smartagentextension/config.go index df377b0560..f0b0868a05 100644 --- a/internal/extension/smartagentextension/config.go +++ b/internal/extension/smartagentextension/config.go @@ -25,7 +25,7 @@ import ( "gopkg.in/yaml.v2" ) -var _ config.CustomUnmarshable = (*Config)(nil) +var _ config.Unmarshallable = (*Config)(nil) type Config struct { config.ExtensionSettings `mapstructure:",squash"` diff --git a/internal/receiver/smartagentreceiver/config.go b/internal/receiver/smartagentreceiver/config.go index e04c5e9f5e..977ea47286 100644 --- a/internal/receiver/smartagentreceiver/config.go +++ b/internal/receiver/smartagentreceiver/config.go @@ -32,7 +32,7 @@ import ( const defaultIntervalSeconds = 10 -var _ config.CustomUnmarshable = (*Config)(nil) +var _ config.Unmarshallable = (*Config)(nil) var errDimensionClientValue = fmt.Errorf("dimensionClients must be an array of compatible exporter names") diff --git a/internal/receiver/smartagentreceiver/receiver_test.go b/internal/receiver/smartagentreceiver/receiver_test.go index 8783573fb8..f5a1b5c69d 100644 --- a/internal/receiver/smartagentreceiver/receiver_test.go +++ b/internal/receiver/smartagentreceiver/receiver_test.go @@ -125,18 +125,16 @@ func TestSmartAgentReceiver(t *testing.T) { for l := 0; l < dg.DataPoints().Len(); l++ { dgdp := dg.DataPoints().At(l) labels = dgdp.LabelsMap() - var val interface{} = dgdp.Value() - _, ok := val.(float64) - assert.True(t, ok, "invalid value of MetricDataTypeGauge metric %s", name) + var val = dgdp.DoubleVal() + assert.NotEqual(t, val, 0, "invalid value of MetricDataTypeGauge metric %s", name) } case pdata.MetricDataTypeSum: ds := metric.Sum() for l := 0; l < ds.DataPoints().Len(); l++ { dsdp := ds.DataPoints().At(l) labels = dsdp.LabelsMap() - var val interface{} = dsdp.Value() - _, ok := val.(float64) - assert.True(t, ok, "invalid value of MetricDataTypeSum metric %s", name) + var val float64 = dsdp.DoubleVal() + assert.NotEqual(t, val, 0, "invalid value of MetricDataTypeSum metric %s", name) } default: t.Errorf("unexpected type %#v for metric %s", metric.DataType(), name)