From 51828597cc82c7c7ca41bbd3df059cb899064205 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 27 Mar 2024 14:24:05 -0400 Subject: [PATCH 01/27] custom message interface --- extension/opampextension/custom_messages.go | 25 +++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 extension/opampextension/custom_messages.go diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go new file mode 100644 index 0000000000000..0fbf1bac90d62 --- /dev/null +++ b/extension/opampextension/custom_messages.go @@ -0,0 +1,25 @@ +package opampextension + +import "github.com/open-telemetry/opamp-go/protobufs" + +// CustomMessageCallback is a callback that handles a custom message from opamp. +type CustomMessageCallback func(*protobufs.CustomMessage) + +// CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages. +type CustomCapabilityRegistry interface { + // Register registers a new custom capability. Any messages for the capability + // will be received by the given callback asynchronously. + // It returns a handle to a CustomCapability, which can be used to unregister, or send + // a message to the OpAMP server. + Register(capability string, callback CustomMessageCallback) CustomCapability +} + +// CustomCapability represents a handle to a custom capability. +// This can be used to send a custom message to an OpAMP server, or to unregister +// the capability from the registry. +type CustomCapability interface { + // SendMessage sends a custom message to the OpAMP server. + SendMessage(messageType string, message []byte) error + // Unregister unregisters the custom capability. + Unregister() +} From b5fd947754dc9e70f2e92bbe9a9d1985272b801b Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 1 Apr 2024 15:01:31 -0400 Subject: [PATCH 02/27] opamp custom messages prototype --- extension/opampextension/custom_messages.go | 19 ++- extension/opampextension/opamp_agent.go | 22 ++- extension/opampextension/registry.go | 170 ++++++++++++++++++++ 3 files changed, 201 insertions(+), 10 deletions(-) create mode 100644 extension/opampextension/registry.go diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index 0fbf1bac90d62..0ee5b68fcc447 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -1,6 +1,8 @@ package opampextension -import "github.com/open-telemetry/opamp-go/protobufs" +import ( + "github.com/open-telemetry/opamp-go/protobufs" +) // CustomMessageCallback is a callback that handles a custom message from opamp. type CustomMessageCallback func(*protobufs.CustomMessage) @@ -11,7 +13,7 @@ type CustomCapabilityRegistry interface { // will be received by the given callback asynchronously. // It returns a handle to a CustomCapability, which can be used to unregister, or send // a message to the OpAMP server. - Register(capability string, callback CustomMessageCallback) CustomCapability + Register(capability string, callback CustomMessageCallback) (CustomCapability, error) } // CustomCapability represents a handle to a custom capability. @@ -19,7 +21,18 @@ type CustomCapabilityRegistry interface { // the capability from the registry. type CustomCapability interface { // SendMessage sends a custom message to the OpAMP server. - SendMessage(messageType string, message []byte) error + // + // Only one message can be sent at a time. If SendCustomMessage has been already called + // for any capability, and the message is still pending (in progress) + // then subsequent calls to SendCustomMessage will return github.com/open-telemetry/opamp-go/types.ErrCustomMessagePending + // and a channel that will be closed when the pending message is sent. + // To ensure that the previous send is complete and it is safe to send another CustomMessage, + // the caller should wait for the returned channel to be closed before attempting to send another custom message. + // + // If no error is returned, the channel returned will be closed after the specified + // message is sent. + SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) + // Unregister unregisters the custom capability. Unregister() } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 968bd216392ec..2b5c6bf007fba 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -43,6 +43,8 @@ type opampAgent struct { agentDescription *protobufs.AgentDescription opampClient client.OpAMPClient + + customCapabilityRegistry *customCapabilityRegistry } func (o *opampAgent) Start(ctx context.Context, _ component.Host) error { @@ -121,6 +123,10 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error return nil } +func (o *opampAgent) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { + return o.customCapabilityRegistry.Register(capability, callback) +} + func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { o.eclk.Lock() defer o.eclk.Unlock() @@ -162,14 +168,16 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r } } + opampClient := cfg.Server.GetClient(logger) agent := &opampAgent{ - cfg: cfg, - logger: logger, - agentType: agentType, - agentVersion: agentVersion, - instanceID: uid, - capabilities: cfg.Capabilities, - opampClient: cfg.Server.GetClient(logger), + cfg: cfg, + logger: logger, + agentType: agentType, + agentVersion: agentVersion, + instanceID: uid, + capabilities: cfg.Capabilities, + opampClient: opampClient, + customCapabilityRegistry: newCustomCapabilityRegistry(logger, opampClient), } return agent, nil diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go new file mode 100644 index 0000000000000..05ca47e04e874 --- /dev/null +++ b/extension/opampextension/registry.go @@ -0,0 +1,170 @@ +package opampextension + +import ( + "container/list" + "errors" + "fmt" + "slices" + "sync" + + "github.com/open-telemetry/opamp-go/client" + "github.com/open-telemetry/opamp-go/protobufs" + "go.uber.org/zap" + "golang.org/x/exp/maps" +) + +type customCapabilityRegistry struct { + mux *sync.Mutex + capabilityToCallbacks map[string]*list.List + opampClient client.OpAMPClient + logger *zap.Logger +} + +func newCustomCapabilityRegistry(logger *zap.Logger, opampClient client.OpAMPClient) *customCapabilityRegistry { + return &customCapabilityRegistry{ + mux: &sync.Mutex{}, + capabilityToCallbacks: make(map[string]*list.List), + opampClient: opampClient, + logger: logger, + } +} + +// Register registers a new custom capability for the +func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { + cr.mux.Lock() + defer cr.mux.Unlock() + + capabilities := cr.capabilities() + if !slices.Contains(capabilities, capability) { + capabilities = append(capabilities, capability) + } + + err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{ + Capabilities: capabilities, + }) + if err != nil { + return nil, fmt.Errorf("set custom capabilities: %w", err) + } + + capabilityList := cr.capabilityToCallbacks[capability] + if capabilityList == nil { + capabilityList = list.New() + cr.capabilityToCallbacks[capability] = capabilityList + } + + callbackElem := capabilityList.PushBack(callback) + + cc := newCustomCapability(cr, cr.opampClient, capability) + + // note: We'll have to set the self element in order for the custom capability to remove itself. + cc.selfElement = callbackElem + + return cc, nil +} + +// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered callbacks for +// the messages capability. +func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { + cr.mux.Lock() + defer cr.mux.Unlock() + + callbacks, ok := cr.capabilityToCallbacks[cm.Capability] + if !ok { + return + } + + for node := callbacks.Front(); node != nil; node = node.Next() { + cb, ok := node.Value.(CustomMessageCallback) + if !ok { + continue + } + + // Let the callback process asynchronously in a separate goroutine so it can't block + // the opamp extension + go cb(cm) + } +} + +// RemoveCustomCapability removes the custom capability with the given callback list element, +// then recalculates and sets the list of custom capabilities on the OpAMP client. +func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, callbackElement *list.Element) { + cr.mux.Lock() + defer cr.mux.Unlock() + + callbackList := cr.capabilityToCallbacks[capability] + callbackList.Remove(callbackElement) + + if callbackList.Front() == nil { + // Since there are no more callbacks for this capability, + // this capability is no longer supported + delete(cr.capabilityToCallbacks, capability) + } + + capabilities := cr.capabilities() + err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{ + Capabilities: capabilities, + }) + if err != nil { + // It's OK if we couldn't actually remove the capability, it just means we won't + // notify the server properly, and the server may send us messages that we have no associated callbacks for. + cr.logger.Error("Failed to set new capabilities", zap.Error(err)) + } +} + +// capabilities gives the current set of custom capabilities with at least one +// callback registered. +func (cr *customCapabilityRegistry) capabilities() []string { + return maps.Keys(cr.capabilityToCallbacks) +} + +type customCapabilityHandler struct { + // unregisteredMux protects unregistered, and makes sure that a message cannot be sent + // on an unregistered capability. + unregisteredMux *sync.Mutex + + capability string + selfElement *list.Element + opampClient client.OpAMPClient + registry *customCapabilityRegistry + unregistered bool +} + +func newCustomCapability( + registry *customCapabilityRegistry, + opampClient client.OpAMPClient, + capability string, +) *customCapabilityHandler { + return &customCapabilityHandler{ + unregisteredMux: &sync.Mutex{}, + + capability: capability, + opampClient: opampClient, + registry: registry, + } +} + +// SendMessage synchronously sends the message +func (c *customCapabilityHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { + c.unregisteredMux.Lock() + defer c.unregisteredMux.Unlock() + + if c.unregistered { + return nil, errors.New("capability has already been unregistered") + } + + cm := &protobufs.CustomMessage{ + Capability: c.capability, + Type: messageType, + Data: message, + } + + return c.opampClient.SendCustomMessage(cm) +} + +func (c *customCapabilityHandler) Unregister() { + c.unregisteredMux.Lock() + defer c.unregisteredMux.Unlock() + + c.unregistered = true + c.registry.RemoveCustomCapability(c.capability, c.selfElement) +} From 3901617493ce7dbfe6b280f344587ecfd9fc0380 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 9 Apr 2024 15:02:40 -0400 Subject: [PATCH 03/27] finish registry + tests --- extension/opampextension/registry.go | 23 +- extension/opampextension/registry_test.go | 272 ++++++++++++++++++++++ 2 files changed, 286 insertions(+), 9 deletions(-) create mode 100644 extension/opampextension/registry_test.go diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index 05ca47e04e874..1e17a0e8542af 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -7,24 +7,29 @@ import ( "slices" "sync" - "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/protobufs" "go.uber.org/zap" "golang.org/x/exp/maps" ) +// customCapabilityClient is a subset of OpAMP client containing only the +type customCapabilityClient interface { + SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error + SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) +} + type customCapabilityRegistry struct { mux *sync.Mutex capabilityToCallbacks map[string]*list.List - opampClient client.OpAMPClient + client customCapabilityClient logger *zap.Logger } -func newCustomCapabilityRegistry(logger *zap.Logger, opampClient client.OpAMPClient) *customCapabilityRegistry { +func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry { return &customCapabilityRegistry{ mux: &sync.Mutex{}, capabilityToCallbacks: make(map[string]*list.List), - opampClient: opampClient, + client: client, logger: logger, } } @@ -39,7 +44,7 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM capabilities = append(capabilities, capability) } - err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{ + err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ Capabilities: capabilities, }) if err != nil { @@ -54,7 +59,7 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM callbackElem := capabilityList.PushBack(callback) - cc := newCustomCapability(cr, cr.opampClient, capability) + cc := newCustomCapability(cr, cr.client, capability) // note: We'll have to set the self element in order for the custom capability to remove itself. cc.selfElement = callbackElem @@ -101,7 +106,7 @@ func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, ca } capabilities := cr.capabilities() - err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{ + err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ Capabilities: capabilities, }) if err != nil { @@ -124,14 +129,14 @@ type customCapabilityHandler struct { capability string selfElement *list.Element - opampClient client.OpAMPClient + opampClient customCapabilityClient registry *customCapabilityRegistry unregistered bool } func newCustomCapability( registry *customCapabilityRegistry, - opampClient client.OpAMPClient, + opampClient customCapabilityClient, capability string, ) *customCapabilityHandler { return &customCapabilityHandler{ diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go new file mode 100644 index 0000000000000..36441f998f989 --- /dev/null +++ b/extension/opampextension/registry_test.go @@ -0,0 +1,272 @@ +package opampextension + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestRegistry_Register(t *testing.T) { + t.Run("Registers successfully", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + + client := mockCustomCapabilityClient{ + setCustomCapabilites: func(customCapabilities *protobufs.CustomCapabilities) error { + require.Equal(t, + &protobufs.CustomCapabilities{ + Capabilities: []string{capabilityString}, + }, + customCapabilities) + return nil + }, + } + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + require.NoError(t, err) + require.NotNil(t, capability) + }) + + t.Run("Setting capabilities fails", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + capabilityErr := errors.New("network error") + + client := mockCustomCapabilityClient{ + setCustomCapabilites: func(customCapabilities *protobufs.CustomCapabilities) error { + return capabilityErr + }, + } + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + require.Nil(t, capability) + require.ErrorIs(t, err, capabilityErr) + require.Len(t, registry.capabilityToCallbacks, 0, "Setting capability failed, but callback ended up in the map anyways") + }) +} + +func TestRegistry_ProcessMessage(t *testing.T) { + t.Run("Calls registered callback", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + callbackCalledChan := make(chan struct{}) + capability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + require.Equal(t, customMessage, c) + + close(callbackCalledChan) + }) + require.NotNil(t, capability) + require.NoError(t, err) + + registry.ProcessMessage(customMessage) + select { + case <-time.After(2 * time.Second): + t.Fatalf("Timed out waiting for callback to be called") + case <-callbackCalledChan: // OK + } + + }) + + t.Run("Callback is called only for its own capability", func(t *testing.T) { + teapotCapabilityString1 := "io.opentelemetry.teapot" + coffeeMakerCapabilityString2 := "io.opentelemetry.coffeeMaker" + + messageType1 := "steep" + messageBytes1 := []byte("blackTea") + + messageType2 := "brew" + messageBytes2 := []byte("blackCoffee") + + customMessageSteep := &protobufs.CustomMessage{ + Capability: teapotCapabilityString1, + Type: messageType1, + Data: messageBytes1, + } + + customMessageBrew := &protobufs.CustomMessage{ + Capability: coffeeMakerCapabilityString2, + Type: messageType2, + Data: messageBytes2, + } + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + teapotCalledChan := make(chan struct{}) + capabilityTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { + require.Equal(t, customMessageSteep, c) + + close(teapotCalledChan) + }) + require.NotNil(t, capabilityTeapot) + require.NoError(t, err) + + coffeeMakerCalledChan := make(chan struct{}) + capabilityCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { + require.Equal(t, customMessageBrew, c) + + close(coffeeMakerCalledChan) + }) + require.NotNil(t, capabilityCoffeeMaker) + require.NoError(t, err) + + registry.ProcessMessage(customMessageSteep) + registry.ProcessMessage(customMessageBrew) + + select { + case <-time.After(2 * time.Second): + t.Fatalf("Timed out waiting for callback 1 to be called") + case <-coffeeMakerCalledChan: // OK + } + select { + case <-time.After(2 * time.Second): + t.Fatalf("Timed out waiting for callback 2 to be called") + case <-coffeeMakerCalledChan: // OK + } + }) +} + +func TestCustomCapability_SendMesage(t *testing.T) { + t.Run("Sends message", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "brew" + mesageBytes := []byte("black") + + client := mockCustomCapabilityClient{ + sendCustomMessage: func(message *protobufs.CustomMessage) (chan struct{}, error) { + require.Equal(t, &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + }, message) + return nil, nil + }, + } + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + capability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) + require.NoError(t, err) + require.NotNil(t, capability) + + channel, err := capability.SendMessage(messageType, mesageBytes) + require.NoError(t, err) + require.Nil(t, channel, nil) + }) +} + +func TestCustomCapability_Unregister(t *testing.T) { + t.Run("Unregistered capability callback is no longer called", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + t.Fatalf("Unregistered capability should not be called") + }) + require.NotNil(t, unregisteredCapability) + require.NoError(t, err) + + unregisteredCapability.Unregister() + + registry.ProcessMessage(customMessage) + }) + + t.Run("Unregister is successful even if set capabilities fails", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } + + client := &mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + t.Fatalf("Unregistered capability should not be called") + }) + require.NotNil(t, unregisteredCapability) + require.NoError(t, err) + + client.setCustomCapabilites = func(customCapabilities *protobufs.CustomCapabilities) error { + return fmt.Errorf("failed to set capabilities") + } + + unregisteredCapability.Unregister() + + registry.ProcessMessage(customMessage) + }) + + t.Run("Does not send if unregistered", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + t.Fatalf("Unregistered capability should not be called") + }) + require.NotNil(t, unregisteredCapability) + require.NoError(t, err) + + unregisteredCapability.Unregister() + + _, err = unregisteredCapability.SendMessage(messageType, mesageBytes) + require.ErrorContains(t, err, "capability has already been unregistered") + }) +} + +type mockCustomCapabilityClient struct { + sendCustomMessage func(message *protobufs.CustomMessage) (chan struct{}, error) + setCustomCapabilites func(customCapabilities *protobufs.CustomCapabilities) error +} + +func (m mockCustomCapabilityClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error { + if m.setCustomCapabilites != nil { + return m.setCustomCapabilites(customCapabilities) + } + return nil +} + +func (m mockCustomCapabilityClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { + if m.sendCustomMessage != nil { + return m.sendCustomMessage(message) + } + + return make(chan struct{}), nil +} From b83d7c1f1a47c40666dcd491e8d60047e6271860 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 9 Apr 2024 15:21:30 -0400 Subject: [PATCH 04/27] ensure extension is a valid registry --- extension/opampextension/opamp_agent.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 2b5c6bf007fba..d54c905d17fc8 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -47,6 +47,8 @@ type opampAgent struct { customCapabilityRegistry *customCapabilityRegistry } +var _ CustomCapabilityRegistry = (*opampAgent)(nil) + func (o *opampAgent) Start(ctx context.Context, _ component.Host) error { header := http.Header{} for k, v := range o.cfg.Server.GetHeaders() { From 0c1fe09c8ff225b162331db9424b1be8a69b7ccf Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 9 Apr 2024 15:23:30 -0400 Subject: [PATCH 05/27] add license --- extension/opampextension/custom_messages.go | 3 +++ extension/opampextension/registry.go | 3 +++ extension/opampextension/registry_test.go | 3 +++ 3 files changed, 9 insertions(+) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index 0ee5b68fcc447..49fbbdf0ddad1 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package opampextension import ( diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index 1e17a0e8542af..c68e4c41ba522 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package opampextension import ( diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index 36441f998f989..a720de020177f 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package opampextension import ( From 8b5f549e144005578c6796040c7daf3d4322610d Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 9 Apr 2024 16:00:34 -0400 Subject: [PATCH 06/27] add check for unregistered in Unregister --- extension/opampextension/registry.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index c68e4c41ba522..ede021a09ed79 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -173,6 +173,10 @@ func (c *customCapabilityHandler) Unregister() { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() + if c.unregistered { + return + } + c.unregistered = true c.registry.RemoveCustomCapability(c.capability, c.selfElement) } From e204bcfbf1caceb46d74410fe28a36050287f70d Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 10 Apr 2024 09:33:41 -0400 Subject: [PATCH 07/27] process custom messages with the registry when received. --- extension/opampextension/opamp_agent.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index d54c905d17fc8..6a261a7a05c33 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -266,15 +266,17 @@ func (o *opampAgent) composeEffectiveConfig() *protobufs.EffectiveConfig { } func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) { - if msg.AgentIdentification == nil { - return - } + if msg.AgentIdentification != nil { + instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) + if err != nil { + o.logger.Error("Failed to parse a new agent identity", zap.Error(err)) + return + } - instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) - if err != nil { - o.logger.Error("Failed to parse a new agent identity", zap.Error(err)) - return + o.updateAgentIdentity(instanceID) } - o.updateAgentIdentity(instanceID) + if msg.CustomMessage != nil { + o.customCapabilityRegistry.ProcessMessage(msg.CustomMessage) + } } From 47f1987c7d2293c3a3ff1d04f8479e43bedcc2ad Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 10 Apr 2024 09:41:41 -0400 Subject: [PATCH 08/27] add chlog --- .../feat_opamp-extension-custom-messages.yaml | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 .chloggen/feat_opamp-extension-custom-messages.yaml diff --git a/.chloggen/feat_opamp-extension-custom-messages.yaml b/.chloggen/feat_opamp-extension-custom-messages.yaml new file mode 100644 index 0000000000000..20c4b18e07506 --- /dev/null +++ b/.chloggen/feat_opamp-extension-custom-messages.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Added support for other components to register custom capabilities and receive custom messages from an opamp extension" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32189] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] From e277407a4f669fdac0ee9db7e8145d4d1c278ac4 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 10 Apr 2024 12:10:11 -0400 Subject: [PATCH 09/27] add readme section about custom messages --- extension/opampextension/README.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/extension/opampextension/README.md b/extension/opampextension/README.md index 20264a951cfa2..c60519c4de12c 100644 --- a/extension/opampextension/README.md +++ b/extension/opampextension/README.md @@ -43,6 +43,30 @@ extensions: endpoint: wss://127.0.0.1:4320/v1/opamp ``` +## Custom Messages + +Other components may use a configured OpAMP extension to send and receive custom messages to and from an OpAMP server. Components may use the provided `components.Host` from the Start method in order to get a handle to the registry: + +```go +func Start(_ context.Context, host component.Host) error { + ext, ok := host.GetExtensions()[opampExtensionID] + if !ok { + return fmt.Errorf("opamp extension %q does not exist", opampExtensionID) + } + + registry, ok := ext.(opampextension.CustomCapabilityRegistry) + if !ok { + return fmt.Errorf("extension %q is not an custom message registry", opampExtensionID) + } + + // You can now use registry.Register to register a custom capability + + return nil +} +``` + +See the [custom_messages.go](./custom_messages.go) for more information on the custom message API. + ## Status This OpenTelemetry OpAMP agent extension is intended to support the [OpAMP From 5ece3766c36111d16a39ab8e72600dd0cae1baa1 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 10 Apr 2024 16:17:27 -0400 Subject: [PATCH 10/27] fix lint errors --- extension/opampextension/registry_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index a720de020177f..e86172afbbaec 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -41,7 +41,7 @@ func TestRegistry_Register(t *testing.T) { capabilityErr := errors.New("network error") client := mockCustomCapabilityClient{ - setCustomCapabilites: func(customCapabilities *protobufs.CustomCapabilities) error { + setCustomCapabilites: func(_ *protobufs.CustomCapabilities) error { return capabilityErr }, } @@ -192,7 +192,7 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) require.NotNil(t, unregisteredCapability) @@ -217,13 +217,13 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) require.NotNil(t, unregisteredCapability) require.NoError(t, err) - client.setCustomCapabilites = func(customCapabilities *protobufs.CustomCapabilities) error { + client.setCustomCapabilites = func(_ *protobufs.CustomCapabilities) error { return fmt.Errorf("failed to set capabilities") } @@ -241,7 +241,7 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) require.NotNil(t, unregisteredCapability) From 4a2d18a539aece76b6f74da4e3eb23ea8e1e51eb Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 15 Apr 2024 10:51:58 -0400 Subject: [PATCH 11/27] make goporto --- extension/opampextension/custom_messages.go | 2 +- extension/opampextension/registry.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index 49fbbdf0ddad1..becefeb3f9c67 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package opampextension +package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension" import ( "github.com/open-telemetry/opamp-go/protobufs" diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index ede021a09ed79..a164a19c5e6ca 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package opampextension +package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension" import ( "container/list" From af558b19dbbeaa13efcc713f7c980e83968756d1 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 16 Apr 2024 10:11:52 -0400 Subject: [PATCH 12/27] fix incorrect tracking issue --- .chloggen/feat_opamp-extension-custom-messages.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/feat_opamp-extension-custom-messages.yaml b/.chloggen/feat_opamp-extension-custom-messages.yaml index 20c4b18e07506..7e16c6f7aa7b0 100644 --- a/.chloggen/feat_opamp-extension-custom-messages.yaml +++ b/.chloggen/feat_opamp-extension-custom-messages.yaml @@ -10,7 +10,7 @@ component: opampextension note: "Added support for other components to register custom capabilities and receive custom messages from an opamp extension" # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [32189] +issues: [32021] # If your change doesn't affect end users or the exported elements of any package, # you should instead start your pull request title with [chore] or use the "Skip Changelog" label. From df759d478e5f24fe954f95f1fc91fc8ae9574db3 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 16 Apr 2024 22:31:04 -0400 Subject: [PATCH 13/27] Rework interface based on feedback --- extension/opampextension/custom_messages.go | 26 ++-- extension/opampextension/opamp_agent.go | 31 ++-- extension/opampextension/registry.go | 114 +++++++------- extension/opampextension/registry_test.go | 155 ++++++++++++-------- 4 files changed, 184 insertions(+), 142 deletions(-) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index becefeb3f9c67..b623fa217d8f5 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -7,22 +7,29 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) -// CustomMessageCallback is a callback that handles a custom message from opamp. +// CustomMessageCallback is a callback that handles a custom message from OpAMP. type CustomMessageCallback func(*protobufs.CustomMessage) // CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages. type CustomCapabilityRegistry interface { // Register registers a new custom capability. Any messages for the capability // will be received by the given callback asynchronously. - // It returns a handle to a CustomCapability, which can be used to unregister, or send - // a message to the OpAMP server. - Register(capability string, callback CustomMessageCallback) (CustomCapability, error) + // It returns a a CustomCapabilitySender, which can be used to send a message to the OpAMP server using the capability. + Register(capability string, listener CustomCapabilityListener) (CustomMessageSender, error) } -// CustomCapability represents a handle to a custom capability. -// This can be used to send a custom message to an OpAMP server, or to unregister -// the capability from the registry. -type CustomCapability interface { +// CustomCapabilityListener is an interface that receives custom messages. +type CustomCapabilityListener interface { + // ReceiveMessage is called whenever a message for the capability is received from the OpAMP server. + ReceiveMessage(msg *protobufs.CustomMessage) + // Done returns a channel signaling that the listener is done listening. Some time after a signal is emitted on the channel, + // messages will no longer be received by ReceiveMessage for this listener. + Done() <-chan struct{} +} + +// CustomMessageSender can be used to send a custom message to an OpAMP server. +// The capability the message is sent for is the capability that was used in CustomCapabilityRegistry.Register. +type CustomMessageSender interface { // SendMessage sends a custom message to the OpAMP server. // // Only one message can be sent at a time. If SendCustomMessage has been already called @@ -35,7 +42,4 @@ type CustomCapability interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) - - // Unregister unregisters the custom capability. - Unregister() } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 6a261a7a05c33..95658762e04c1 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -5,6 +5,7 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec import ( "context" + "errors" "net/http" "os" "runtime" @@ -102,19 +103,25 @@ func (o *opampAgent) Start(ctx context.Context, _ component.Host) error { return nil } -func (o *opampAgent) Shutdown(ctx context.Context) error { +func (o *opampAgent) Shutdown(ctx context.Context) (returnedErr error) { o.logger.Debug("OpAMP agent shutting down...") - if o.opampClient == nil { - return nil + + if o.opampClient != nil { + o.logger.Debug("Stopping OpAMP client...") + err := o.opampClient.Stop(ctx) + // Opamp-go considers this an error, but the collector does not. + // https://github.com/open-telemetry/opamp-go/issues/255 + if err != nil && !strings.EqualFold(err.Error(), "cannot stop because not started") { + returnedErr = errors.Join(returnedErr, err) + } } - o.logger.Debug("Stopping OpAMP client...") - err := o.opampClient.Stop(ctx) - // Opamp-go considers this an error, but the collector does not. - // https://github.com/open-telemetry/opamp-go/issues/255 - if err != nil && strings.EqualFold(err.Error(), "cannot stop because not started") { - return nil + + if o.customCapabilityRegistry != nil { + o.logger.Debug("Stopping custom message registry...") + o.customCapabilityRegistry.Stop() } - return err + + return returnedErr } func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error { @@ -125,8 +132,8 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error return nil } -func (o *opampAgent) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { - return o.customCapabilityRegistry.Register(capability, callback) +func (o *opampAgent) Register(capability string, listener CustomCapabilityListener) (CustomMessageSender, error) { + return o.customCapabilityRegistry.Register(capability, listener) } func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index a164a19c5e6ca..ae8abe7a029b3 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -5,7 +5,6 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec import ( "container/list" - "errors" "fmt" "slices" "sync" @@ -15,30 +14,35 @@ import ( "golang.org/x/exp/maps" ) -// customCapabilityClient is a subset of OpAMP client containing only the +// customCapabilityClient is a subset of OpAMP client containing only the methods needed by the customCapabilityRegistry type customCapabilityClient interface { SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) } +// customCapabilityRegistry implements CustomCapabilityRegistry. type customCapabilityRegistry struct { mux *sync.Mutex - capabilityToCallbacks map[string]*list.List + capabilityToListeners map[string]*list.List client customCapabilityClient logger *zap.Logger + done chan struct{} + listenerWg *sync.WaitGroup } func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry { return &customCapabilityRegistry{ mux: &sync.Mutex{}, - capabilityToCallbacks: make(map[string]*list.List), + listenerWg: &sync.WaitGroup{}, + done: make(chan struct{}), + capabilityToListeners: make(map[string]*list.List), client: client, logger: logger, } } -// Register registers a new custom capability for the -func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { +// Register registers a new listener for the custom capability. +func (cr *customCapabilityRegistry) Register(capability string, listener CustomCapabilityListener) (CustomMessageSender, error) { cr.mux.Lock() defer cr.mux.Unlock() @@ -54,18 +58,18 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM return nil, fmt.Errorf("set custom capabilities: %w", err) } - capabilityList := cr.capabilityToCallbacks[capability] + capabilityList := cr.capabilityToListeners[capability] if capabilityList == nil { capabilityList = list.New() - cr.capabilityToCallbacks[capability] = capabilityList + cr.capabilityToListeners[capability] = capabilityList } - callbackElem := capabilityList.PushBack(callback) + listenerElem := capabilityList.PushBack(listener) - cc := newCustomCapability(cr, cr.client, capability) + cr.listenerWg.Add(1) + go cr.waitForListenerDone(capability, listenerElem) - // note: We'll have to set the self element in order for the custom capability to remove itself. - cc.selfElement = callbackElem + cc := newCustomCapabilitySender(cr.client, capability) return cc, nil } @@ -76,36 +80,54 @@ func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { cr.mux.Lock() defer cr.mux.Unlock() - callbacks, ok := cr.capabilityToCallbacks[cm.Capability] + callbacks, ok := cr.capabilityToListeners[cm.Capability] if !ok { return } for node := callbacks.Front(); node != nil; node = node.Next() { - cb, ok := node.Value.(CustomMessageCallback) + listener, ok := node.Value.(CustomCapabilityListener) if !ok { continue } // Let the callback process asynchronously in a separate goroutine so it can't block // the opamp extension - go cb(cm) + go listener.ReceiveMessage(cm) } } -// RemoveCustomCapability removes the custom capability with the given callback list element, +// waitForListenerDone waits for the CustomCapabilityListener inside the listenerElem to emit a Done signal down it's channel, +// at which point the listener will be unregistered. +func (cr *customCapabilityRegistry) waitForListenerDone(capability string, listenerElem *list.Element) { + defer cr.listenerWg.Done() + + listener := listenerElem.Value.(CustomCapabilityListener) + + select { + case <-listener.Done(): + case <-cr.done: + // This means the whole extension is shutting down, so we don't need to modify the custom capabilities + // (since we are disconnected at this point), and we just need to clean up this goroutine. + return + } + + cr.removeCustomCapabilityListener(capability, listenerElem) +} + +// removeCustomCapabilityListener removes the custom capability with the given callback list element, // then recalculates and sets the list of custom capabilities on the OpAMP client. -func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, callbackElement *list.Element) { +func (cr *customCapabilityRegistry) removeCustomCapabilityListener(capability string, listenerElement *list.Element) { cr.mux.Lock() defer cr.mux.Unlock() - callbackList := cr.capabilityToCallbacks[capability] - callbackList.Remove(callbackElement) + callbackList := cr.capabilityToListeners[capability] + callbackList.Remove(listenerElement) if callbackList.Front() == nil { // Since there are no more callbacks for this capability, // this capability is no longer supported - delete(cr.capabilityToCallbacks, capability) + delete(cr.capabilityToListeners, capability) } capabilities := cr.capabilities() @@ -122,44 +144,32 @@ func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, ca // capabilities gives the current set of custom capabilities with at least one // callback registered. func (cr *customCapabilityRegistry) capabilities() []string { - return maps.Keys(cr.capabilityToCallbacks) + return maps.Keys(cr.capabilityToListeners) } -type customCapabilityHandler struct { - // unregisteredMux protects unregistered, and makes sure that a message cannot be sent - // on an unregistered capability. - unregisteredMux *sync.Mutex +// Stop unregisters all registered capabilities, freeing all goroutines occupied waiting for listeners to emit their Done signal. +func (cr *customCapabilityRegistry) Stop() { + close(cr.done) + cr.listenerWg.Wait() +} - capability string - selfElement *list.Element - opampClient customCapabilityClient - registry *customCapabilityRegistry - unregistered bool +// customCapabilitySender implements +type customCapabilitySender struct { + capability string + opampClient customCapabilityClient } -func newCustomCapability( - registry *customCapabilityRegistry, +func newCustomCapabilitySender( opampClient customCapabilityClient, capability string, -) *customCapabilityHandler { - return &customCapabilityHandler{ - unregisteredMux: &sync.Mutex{}, - +) *customCapabilitySender { + return &customCapabilitySender{ capability: capability, opampClient: opampClient, - registry: registry, } } -// SendMessage synchronously sends the message -func (c *customCapabilityHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { - c.unregisteredMux.Lock() - defer c.unregisteredMux.Unlock() - - if c.unregistered { - return nil, errors.New("capability has already been unregistered") - } - +func (c *customCapabilitySender) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { cm := &protobufs.CustomMessage{ Capability: c.capability, Type: messageType, @@ -168,15 +178,3 @@ func (c *customCapabilityHandler) SendMessage(messageType string, message []byte return c.opampClient.SendCustomMessage(cm) } - -func (c *customCapabilityHandler) Unregister() { - c.unregisteredMux.Lock() - defer c.unregisteredMux.Unlock() - - if c.unregistered { - return - } - - c.unregistered = true - c.registry.RemoveCustomCapability(c.capability, c.selfElement) -} diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index e86172afbbaec..d6c975ab85a14 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -30,10 +30,13 @@ func TestRegistry_Register(t *testing.T) { } registry := newCustomCapabilityRegistry(zap.NewNop(), client) + defer registry.Stop() - capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) + + sender, err := registry.Register(capabilityString, l) require.NoError(t, err) - require.NotNil(t, capability) + require.NotNil(t, sender) }) t.Run("Setting capabilities fails", func(t *testing.T) { @@ -47,11 +50,14 @@ func TestRegistry_Register(t *testing.T) { } registry := newCustomCapabilityRegistry(zap.NewNop(), client) + defer registry.Stop() + + l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) - capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) - require.Nil(t, capability) + sender, err := registry.Register(capabilityString, l) + require.Nil(t, sender) require.ErrorIs(t, err, capabilityErr) - require.Len(t, registry.capabilityToCallbacks, 0, "Setting capability failed, but callback ended up in the map anyways") + require.Len(t, registry.capabilityToListeners, 0, "Setting capability failed, but callback ended up in the map anyways") }) } @@ -69,14 +75,18 @@ func TestRegistry_ProcessMessage(t *testing.T) { client := mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) + defer registry.Stop() callbackCalledChan := make(chan struct{}) - capability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + + l := newMockCustomCapabilityListener(func(c *protobufs.CustomMessage) { require.Equal(t, customMessage, c) close(callbackCalledChan) }) - require.NotNil(t, capability) + + sender, err := registry.Register(capabilityString, l) + require.NotNil(t, sender) require.NoError(t, err) registry.ProcessMessage(customMessage) @@ -113,23 +123,30 @@ func TestRegistry_ProcessMessage(t *testing.T) { client := mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) + defer registry.Stop() teapotCalledChan := make(chan struct{}) - capabilityTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { + + teapotListener := newMockCustomCapabilityListener(func(c *protobufs.CustomMessage) { require.Equal(t, customMessageSteep, c) close(teapotCalledChan) }) - require.NotNil(t, capabilityTeapot) + + teapotSender, err := registry.Register(teapotCapabilityString1, teapotListener) + require.NotNil(t, teapotSender) require.NoError(t, err) coffeeMakerCalledChan := make(chan struct{}) - capabilityCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { + + coffeeMakerListener := newMockCustomCapabilityListener(func(c *protobufs.CustomMessage) { require.Equal(t, customMessageBrew, c) close(coffeeMakerCalledChan) }) - require.NotNil(t, capabilityCoffeeMaker) + + coffeeMakerSender, err := registry.Register(coffeeMakerCapabilityString2, coffeeMakerListener) + require.NotNil(t, coffeeMakerSender) require.NoError(t, err) registry.ProcessMessage(customMessageSteep) @@ -166,12 +183,15 @@ func TestCustomCapability_SendMesage(t *testing.T) { } registry := newCustomCapabilityRegistry(zap.NewNop(), client) + defer registry.Stop() - capability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) + l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) + + sender, err := registry.Register(capabilityString, l) require.NoError(t, err) - require.NotNil(t, capability) + require.NotNil(t, sender) - channel, err := capability.SendMessage(messageType, mesageBytes) + channel, err := sender.SendMessage(messageType, mesageBytes) require.NoError(t, err) require.Nil(t, channel, nil) }) @@ -180,77 +200,70 @@ func TestCustomCapability_SendMesage(t *testing.T) { func TestCustomCapability_Unregister(t *testing.T) { t.Run("Unregistered capability callback is no longer called", func(t *testing.T) { capabilityString := "io.opentelemetry.teapot" - messageType := "steep" - mesageBytes := []byte("blackTea") - customMessage := &protobufs.CustomMessage{ - Capability: capabilityString, - Type: messageType, - Data: mesageBytes, - } - client := mockCustomCapabilityClient{} + client := &mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) + defer registry.Stop() - unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { - t.Fatalf("Unregistered capability should not be called") - }) - require.NotNil(t, unregisteredCapability) + l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) + + client.setCustomCapabilites = func(cc *protobufs.CustomCapabilities) error { + require.Equal(t, &protobufs.CustomCapabilities{ + Capabilities: []string{capabilityString}, + }, cc) + + return nil + } + + sender, err := registry.Register(capabilityString, l) + require.NotNil(t, sender) require.NoError(t, err) - unregisteredCapability.Unregister() + client.setCustomCapabilites = func(cc *protobufs.CustomCapabilities) error { + require.Equal(t, &protobufs.CustomCapabilities{ + Capabilities: []string{}, + }, cc) - registry.ProcessMessage(customMessage) + return nil + } + + close(l.doneChan) + + require.Eventually(t, func() bool { + registry.mux.Lock() + defer registry.mux.Unlock() + + return len(registry.capabilityToListeners) == 0 + }, 2*time.Second, 100*time.Millisecond) }) t.Run("Unregister is successful even if set capabilities fails", func(t *testing.T) { capabilityString := "io.opentelemetry.teapot" - messageType := "steep" - mesageBytes := []byte("blackTea") - customMessage := &protobufs.CustomMessage{ - Capability: capabilityString, - Type: messageType, - Data: mesageBytes, - } client := &mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) + defer registry.Stop() - unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { - t.Fatalf("Unregistered capability should not be called") - }) - require.NotNil(t, unregisteredCapability) + l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) + + sender, err := registry.Register(capabilityString, l) + require.NotNil(t, sender) require.NoError(t, err) client.setCustomCapabilites = func(_ *protobufs.CustomCapabilities) error { return fmt.Errorf("failed to set capabilities") } - unregisteredCapability.Unregister() - - registry.ProcessMessage(customMessage) - }) - - t.Run("Does not send if unregistered", func(t *testing.T) { - capabilityString := "io.opentelemetry.teapot" - messageType := "steep" - mesageBytes := []byte("blackTea") - - client := mockCustomCapabilityClient{} - - registry := newCustomCapabilityRegistry(zap.NewNop(), client) - - unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { - t.Fatalf("Unregistered capability should not be called") - }) - require.NotNil(t, unregisteredCapability) - require.NoError(t, err) + close(l.doneChan) - unregisteredCapability.Unregister() + require.Eventually(t, func() bool { + registry.mux.Lock() + defer registry.mux.Unlock() - _, err = unregisteredCapability.SendMessage(messageType, mesageBytes) - require.ErrorContains(t, err, "capability has already been unregistered") + return len(registry.capabilityToListeners) == 0 + }, 2*time.Second, 100*time.Millisecond) }) } @@ -273,3 +286,23 @@ func (m mockCustomCapabilityClient) SendCustomMessage(message *protobufs.CustomM return make(chan struct{}), nil } + +type mockCustomCapabilityListener struct { + doneChan chan struct{} + receiveFunc func(*protobufs.CustomMessage) +} + +func newMockCustomCapabilityListener(recieveFunc func(*protobufs.CustomMessage)) *mockCustomCapabilityListener { + return &mockCustomCapabilityListener{ + doneChan: make(chan struct{}), + receiveFunc: recieveFunc, + } +} + +func (m *mockCustomCapabilityListener) ReceiveMessage(cm *protobufs.CustomMessage) { + m.receiveFunc(cm) +} + +func (m *mockCustomCapabilityListener) Done() <-chan struct{} { + return m.doneChan +} From c31315c352799babd11adb0039663d7bed5e49be Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 16 Apr 2024 22:34:59 -0400 Subject: [PATCH 14/27] callback -> listener --- extension/opampextension/registry.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index ae8abe7a029b3..e1dda2468f398 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -74,24 +74,24 @@ func (cr *customCapabilityRegistry) Register(capability string, listener CustomC return cc, nil } -// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered callbacks for +// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered listeners for // the messages capability. func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { cr.mux.Lock() defer cr.mux.Unlock() - callbacks, ok := cr.capabilityToListeners[cm.Capability] + listeners, ok := cr.capabilityToListeners[cm.Capability] if !ok { return } - for node := callbacks.Front(); node != nil; node = node.Next() { + for node := listeners.Front(); node != nil; node = node.Next() { listener, ok := node.Value.(CustomCapabilityListener) if !ok { continue } - // Let the callback process asynchronously in a separate goroutine so it can't block + // Let the listener process asynchronously in a separate goroutine so it can't block // the opamp extension go listener.ReceiveMessage(cm) } @@ -115,17 +115,17 @@ func (cr *customCapabilityRegistry) waitForListenerDone(capability string, liste cr.removeCustomCapabilityListener(capability, listenerElem) } -// removeCustomCapabilityListener removes the custom capability with the given callback list element, +// removeCustomCapabilityListener removes the custom capability with the given listener list element, // then recalculates and sets the list of custom capabilities on the OpAMP client. func (cr *customCapabilityRegistry) removeCustomCapabilityListener(capability string, listenerElement *list.Element) { cr.mux.Lock() defer cr.mux.Unlock() - callbackList := cr.capabilityToListeners[capability] - callbackList.Remove(listenerElement) + listenerList := cr.capabilityToListeners[capability] + listenerList.Remove(listenerElement) - if callbackList.Front() == nil { - // Since there are no more callbacks for this capability, + if listenerList.Front() == nil { + // Since there are no more listeners for this capability, // this capability is no longer supported delete(cr.capabilityToListeners, capability) } @@ -136,13 +136,13 @@ func (cr *customCapabilityRegistry) removeCustomCapabilityListener(capability st }) if err != nil { // It's OK if we couldn't actually remove the capability, it just means we won't - // notify the server properly, and the server may send us messages that we have no associated callbacks for. + // notify the server properly, and the server may send us messages that we have no associated listeners for. cr.logger.Error("Failed to set new capabilities", zap.Error(err)) } } // capabilities gives the current set of custom capabilities with at least one -// callback registered. +// listener registered. func (cr *customCapabilityRegistry) capabilities() []string { return maps.Keys(cr.capabilityToListeners) } From 0aa8fba3a2185c131158e647d5d2593f24469ad5 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 17 Apr 2024 15:03:01 -0400 Subject: [PATCH 15/27] Revert "callback -> listener" This reverts commit af136737fcc6ce6082ebc91ebcbf4c909578a096. --- extension/opampextension/registry.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index e1dda2468f398..ae8abe7a029b3 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -74,24 +74,24 @@ func (cr *customCapabilityRegistry) Register(capability string, listener CustomC return cc, nil } -// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered listeners for +// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered callbacks for // the messages capability. func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { cr.mux.Lock() defer cr.mux.Unlock() - listeners, ok := cr.capabilityToListeners[cm.Capability] + callbacks, ok := cr.capabilityToListeners[cm.Capability] if !ok { return } - for node := listeners.Front(); node != nil; node = node.Next() { + for node := callbacks.Front(); node != nil; node = node.Next() { listener, ok := node.Value.(CustomCapabilityListener) if !ok { continue } - // Let the listener process asynchronously in a separate goroutine so it can't block + // Let the callback process asynchronously in a separate goroutine so it can't block // the opamp extension go listener.ReceiveMessage(cm) } @@ -115,17 +115,17 @@ func (cr *customCapabilityRegistry) waitForListenerDone(capability string, liste cr.removeCustomCapabilityListener(capability, listenerElem) } -// removeCustomCapabilityListener removes the custom capability with the given listener list element, +// removeCustomCapabilityListener removes the custom capability with the given callback list element, // then recalculates and sets the list of custom capabilities on the OpAMP client. func (cr *customCapabilityRegistry) removeCustomCapabilityListener(capability string, listenerElement *list.Element) { cr.mux.Lock() defer cr.mux.Unlock() - listenerList := cr.capabilityToListeners[capability] - listenerList.Remove(listenerElement) + callbackList := cr.capabilityToListeners[capability] + callbackList.Remove(listenerElement) - if listenerList.Front() == nil { - // Since there are no more listeners for this capability, + if callbackList.Front() == nil { + // Since there are no more callbacks for this capability, // this capability is no longer supported delete(cr.capabilityToListeners, capability) } @@ -136,13 +136,13 @@ func (cr *customCapabilityRegistry) removeCustomCapabilityListener(capability st }) if err != nil { // It's OK if we couldn't actually remove the capability, it just means we won't - // notify the server properly, and the server may send us messages that we have no associated listeners for. + // notify the server properly, and the server may send us messages that we have no associated callbacks for. cr.logger.Error("Failed to set new capabilities", zap.Error(err)) } } // capabilities gives the current set of custom capabilities with at least one -// listener registered. +// callback registered. func (cr *customCapabilityRegistry) capabilities() []string { return maps.Keys(cr.capabilityToListeners) } From 9f7de5964d9d1721e2919dd0fdf7d32f878b53a1 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 17 Apr 2024 15:03:16 -0400 Subject: [PATCH 16/27] Revert "Rework interface based on feedback" This reverts commit ff81b43c07ea2c5b33e440e85cddc1dc41aac65b. --- extension/opampextension/custom_messages.go | 26 ++-- extension/opampextension/opamp_agent.go | 31 ++-- extension/opampextension/registry.go | 114 +++++++------- extension/opampextension/registry_test.go | 155 ++++++++------------ 4 files changed, 142 insertions(+), 184 deletions(-) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index b623fa217d8f5..becefeb3f9c67 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -7,29 +7,22 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) -// CustomMessageCallback is a callback that handles a custom message from OpAMP. +// CustomMessageCallback is a callback that handles a custom message from opamp. type CustomMessageCallback func(*protobufs.CustomMessage) // CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages. type CustomCapabilityRegistry interface { // Register registers a new custom capability. Any messages for the capability // will be received by the given callback asynchronously. - // It returns a a CustomCapabilitySender, which can be used to send a message to the OpAMP server using the capability. - Register(capability string, listener CustomCapabilityListener) (CustomMessageSender, error) + // It returns a handle to a CustomCapability, which can be used to unregister, or send + // a message to the OpAMP server. + Register(capability string, callback CustomMessageCallback) (CustomCapability, error) } -// CustomCapabilityListener is an interface that receives custom messages. -type CustomCapabilityListener interface { - // ReceiveMessage is called whenever a message for the capability is received from the OpAMP server. - ReceiveMessage(msg *protobufs.CustomMessage) - // Done returns a channel signaling that the listener is done listening. Some time after a signal is emitted on the channel, - // messages will no longer be received by ReceiveMessage for this listener. - Done() <-chan struct{} -} - -// CustomMessageSender can be used to send a custom message to an OpAMP server. -// The capability the message is sent for is the capability that was used in CustomCapabilityRegistry.Register. -type CustomMessageSender interface { +// CustomCapability represents a handle to a custom capability. +// This can be used to send a custom message to an OpAMP server, or to unregister +// the capability from the registry. +type CustomCapability interface { // SendMessage sends a custom message to the OpAMP server. // // Only one message can be sent at a time. If SendCustomMessage has been already called @@ -42,4 +35,7 @@ type CustomMessageSender interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) + + // Unregister unregisters the custom capability. + Unregister() } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 95658762e04c1..6a261a7a05c33 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -5,7 +5,6 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec import ( "context" - "errors" "net/http" "os" "runtime" @@ -103,25 +102,19 @@ func (o *opampAgent) Start(ctx context.Context, _ component.Host) error { return nil } -func (o *opampAgent) Shutdown(ctx context.Context) (returnedErr error) { +func (o *opampAgent) Shutdown(ctx context.Context) error { o.logger.Debug("OpAMP agent shutting down...") - - if o.opampClient != nil { - o.logger.Debug("Stopping OpAMP client...") - err := o.opampClient.Stop(ctx) - // Opamp-go considers this an error, but the collector does not. - // https://github.com/open-telemetry/opamp-go/issues/255 - if err != nil && !strings.EqualFold(err.Error(), "cannot stop because not started") { - returnedErr = errors.Join(returnedErr, err) - } + if o.opampClient == nil { + return nil } - - if o.customCapabilityRegistry != nil { - o.logger.Debug("Stopping custom message registry...") - o.customCapabilityRegistry.Stop() + o.logger.Debug("Stopping OpAMP client...") + err := o.opampClient.Stop(ctx) + // Opamp-go considers this an error, but the collector does not. + // https://github.com/open-telemetry/opamp-go/issues/255 + if err != nil && strings.EqualFold(err.Error(), "cannot stop because not started") { + return nil } - - return returnedErr + return err } func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error { @@ -132,8 +125,8 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error return nil } -func (o *opampAgent) Register(capability string, listener CustomCapabilityListener) (CustomMessageSender, error) { - return o.customCapabilityRegistry.Register(capability, listener) +func (o *opampAgent) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { + return o.customCapabilityRegistry.Register(capability, callback) } func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index ae8abe7a029b3..a164a19c5e6ca 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -5,6 +5,7 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec import ( "container/list" + "errors" "fmt" "slices" "sync" @@ -14,35 +15,30 @@ import ( "golang.org/x/exp/maps" ) -// customCapabilityClient is a subset of OpAMP client containing only the methods needed by the customCapabilityRegistry +// customCapabilityClient is a subset of OpAMP client containing only the type customCapabilityClient interface { SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) } -// customCapabilityRegistry implements CustomCapabilityRegistry. type customCapabilityRegistry struct { mux *sync.Mutex - capabilityToListeners map[string]*list.List + capabilityToCallbacks map[string]*list.List client customCapabilityClient logger *zap.Logger - done chan struct{} - listenerWg *sync.WaitGroup } func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry { return &customCapabilityRegistry{ mux: &sync.Mutex{}, - listenerWg: &sync.WaitGroup{}, - done: make(chan struct{}), - capabilityToListeners: make(map[string]*list.List), + capabilityToCallbacks: make(map[string]*list.List), client: client, logger: logger, } } -// Register registers a new listener for the custom capability. -func (cr *customCapabilityRegistry) Register(capability string, listener CustomCapabilityListener) (CustomMessageSender, error) { +// Register registers a new custom capability for the +func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { cr.mux.Lock() defer cr.mux.Unlock() @@ -58,18 +54,18 @@ func (cr *customCapabilityRegistry) Register(capability string, listener CustomC return nil, fmt.Errorf("set custom capabilities: %w", err) } - capabilityList := cr.capabilityToListeners[capability] + capabilityList := cr.capabilityToCallbacks[capability] if capabilityList == nil { capabilityList = list.New() - cr.capabilityToListeners[capability] = capabilityList + cr.capabilityToCallbacks[capability] = capabilityList } - listenerElem := capabilityList.PushBack(listener) + callbackElem := capabilityList.PushBack(callback) - cr.listenerWg.Add(1) - go cr.waitForListenerDone(capability, listenerElem) + cc := newCustomCapability(cr, cr.client, capability) - cc := newCustomCapabilitySender(cr.client, capability) + // note: We'll have to set the self element in order for the custom capability to remove itself. + cc.selfElement = callbackElem return cc, nil } @@ -80,54 +76,36 @@ func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { cr.mux.Lock() defer cr.mux.Unlock() - callbacks, ok := cr.capabilityToListeners[cm.Capability] + callbacks, ok := cr.capabilityToCallbacks[cm.Capability] if !ok { return } for node := callbacks.Front(); node != nil; node = node.Next() { - listener, ok := node.Value.(CustomCapabilityListener) + cb, ok := node.Value.(CustomMessageCallback) if !ok { continue } // Let the callback process asynchronously in a separate goroutine so it can't block // the opamp extension - go listener.ReceiveMessage(cm) + go cb(cm) } } -// waitForListenerDone waits for the CustomCapabilityListener inside the listenerElem to emit a Done signal down it's channel, -// at which point the listener will be unregistered. -func (cr *customCapabilityRegistry) waitForListenerDone(capability string, listenerElem *list.Element) { - defer cr.listenerWg.Done() - - listener := listenerElem.Value.(CustomCapabilityListener) - - select { - case <-listener.Done(): - case <-cr.done: - // This means the whole extension is shutting down, so we don't need to modify the custom capabilities - // (since we are disconnected at this point), and we just need to clean up this goroutine. - return - } - - cr.removeCustomCapabilityListener(capability, listenerElem) -} - -// removeCustomCapabilityListener removes the custom capability with the given callback list element, +// RemoveCustomCapability removes the custom capability with the given callback list element, // then recalculates and sets the list of custom capabilities on the OpAMP client. -func (cr *customCapabilityRegistry) removeCustomCapabilityListener(capability string, listenerElement *list.Element) { +func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, callbackElement *list.Element) { cr.mux.Lock() defer cr.mux.Unlock() - callbackList := cr.capabilityToListeners[capability] - callbackList.Remove(listenerElement) + callbackList := cr.capabilityToCallbacks[capability] + callbackList.Remove(callbackElement) if callbackList.Front() == nil { // Since there are no more callbacks for this capability, // this capability is no longer supported - delete(cr.capabilityToListeners, capability) + delete(cr.capabilityToCallbacks, capability) } capabilities := cr.capabilities() @@ -144,32 +122,44 @@ func (cr *customCapabilityRegistry) removeCustomCapabilityListener(capability st // capabilities gives the current set of custom capabilities with at least one // callback registered. func (cr *customCapabilityRegistry) capabilities() []string { - return maps.Keys(cr.capabilityToListeners) + return maps.Keys(cr.capabilityToCallbacks) } -// Stop unregisters all registered capabilities, freeing all goroutines occupied waiting for listeners to emit their Done signal. -func (cr *customCapabilityRegistry) Stop() { - close(cr.done) - cr.listenerWg.Wait() -} +type customCapabilityHandler struct { + // unregisteredMux protects unregistered, and makes sure that a message cannot be sent + // on an unregistered capability. + unregisteredMux *sync.Mutex -// customCapabilitySender implements -type customCapabilitySender struct { - capability string - opampClient customCapabilityClient + capability string + selfElement *list.Element + opampClient customCapabilityClient + registry *customCapabilityRegistry + unregistered bool } -func newCustomCapabilitySender( +func newCustomCapability( + registry *customCapabilityRegistry, opampClient customCapabilityClient, capability string, -) *customCapabilitySender { - return &customCapabilitySender{ +) *customCapabilityHandler { + return &customCapabilityHandler{ + unregisteredMux: &sync.Mutex{}, + capability: capability, opampClient: opampClient, + registry: registry, } } -func (c *customCapabilitySender) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { +// SendMessage synchronously sends the message +func (c *customCapabilityHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { + c.unregisteredMux.Lock() + defer c.unregisteredMux.Unlock() + + if c.unregistered { + return nil, errors.New("capability has already been unregistered") + } + cm := &protobufs.CustomMessage{ Capability: c.capability, Type: messageType, @@ -178,3 +168,15 @@ func (c *customCapabilitySender) SendMessage(messageType string, message []byte) return c.opampClient.SendCustomMessage(cm) } + +func (c *customCapabilityHandler) Unregister() { + c.unregisteredMux.Lock() + defer c.unregisteredMux.Unlock() + + if c.unregistered { + return + } + + c.unregistered = true + c.registry.RemoveCustomCapability(c.capability, c.selfElement) +} diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index d6c975ab85a14..e86172afbbaec 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -30,13 +30,10 @@ func TestRegistry_Register(t *testing.T) { } registry := newCustomCapabilityRegistry(zap.NewNop(), client) - defer registry.Stop() - l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) - - sender, err := registry.Register(capabilityString, l) + capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) require.NoError(t, err) - require.NotNil(t, sender) + require.NotNil(t, capability) }) t.Run("Setting capabilities fails", func(t *testing.T) { @@ -50,14 +47,11 @@ func TestRegistry_Register(t *testing.T) { } registry := newCustomCapabilityRegistry(zap.NewNop(), client) - defer registry.Stop() - - l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) - sender, err := registry.Register(capabilityString, l) - require.Nil(t, sender) + capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + require.Nil(t, capability) require.ErrorIs(t, err, capabilityErr) - require.Len(t, registry.capabilityToListeners, 0, "Setting capability failed, but callback ended up in the map anyways") + require.Len(t, registry.capabilityToCallbacks, 0, "Setting capability failed, but callback ended up in the map anyways") }) } @@ -75,18 +69,14 @@ func TestRegistry_ProcessMessage(t *testing.T) { client := mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) - defer registry.Stop() callbackCalledChan := make(chan struct{}) - - l := newMockCustomCapabilityListener(func(c *protobufs.CustomMessage) { + capability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { require.Equal(t, customMessage, c) close(callbackCalledChan) }) - - sender, err := registry.Register(capabilityString, l) - require.NotNil(t, sender) + require.NotNil(t, capability) require.NoError(t, err) registry.ProcessMessage(customMessage) @@ -123,30 +113,23 @@ func TestRegistry_ProcessMessage(t *testing.T) { client := mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) - defer registry.Stop() teapotCalledChan := make(chan struct{}) - - teapotListener := newMockCustomCapabilityListener(func(c *protobufs.CustomMessage) { + capabilityTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { require.Equal(t, customMessageSteep, c) close(teapotCalledChan) }) - - teapotSender, err := registry.Register(teapotCapabilityString1, teapotListener) - require.NotNil(t, teapotSender) + require.NotNil(t, capabilityTeapot) require.NoError(t, err) coffeeMakerCalledChan := make(chan struct{}) - - coffeeMakerListener := newMockCustomCapabilityListener(func(c *protobufs.CustomMessage) { + capabilityCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { require.Equal(t, customMessageBrew, c) close(coffeeMakerCalledChan) }) - - coffeeMakerSender, err := registry.Register(coffeeMakerCapabilityString2, coffeeMakerListener) - require.NotNil(t, coffeeMakerSender) + require.NotNil(t, capabilityCoffeeMaker) require.NoError(t, err) registry.ProcessMessage(customMessageSteep) @@ -183,15 +166,12 @@ func TestCustomCapability_SendMesage(t *testing.T) { } registry := newCustomCapabilityRegistry(zap.NewNop(), client) - defer registry.Stop() - l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) - - sender, err := registry.Register(capabilityString, l) + capability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) require.NoError(t, err) - require.NotNil(t, sender) + require.NotNil(t, capability) - channel, err := sender.SendMessage(messageType, mesageBytes) + channel, err := capability.SendMessage(messageType, mesageBytes) require.NoError(t, err) require.Nil(t, channel, nil) }) @@ -200,70 +180,77 @@ func TestCustomCapability_SendMesage(t *testing.T) { func TestCustomCapability_Unregister(t *testing.T) { t.Run("Unregistered capability callback is no longer called", func(t *testing.T) { capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } - client := &mockCustomCapabilityClient{} + client := mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) - defer registry.Stop() - - l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) - client.setCustomCapabilites = func(cc *protobufs.CustomCapabilities) error { - require.Equal(t, &protobufs.CustomCapabilities{ - Capabilities: []string{capabilityString}, - }, cc) - - return nil - } - - sender, err := registry.Register(capabilityString, l) - require.NotNil(t, sender) + unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + t.Fatalf("Unregistered capability should not be called") + }) + require.NotNil(t, unregisteredCapability) require.NoError(t, err) - client.setCustomCapabilites = func(cc *protobufs.CustomCapabilities) error { - require.Equal(t, &protobufs.CustomCapabilities{ - Capabilities: []string{}, - }, cc) + unregisteredCapability.Unregister() - return nil - } - - close(l.doneChan) - - require.Eventually(t, func() bool { - registry.mux.Lock() - defer registry.mux.Unlock() - - return len(registry.capabilityToListeners) == 0 - }, 2*time.Second, 100*time.Millisecond) + registry.ProcessMessage(customMessage) }) t.Run("Unregister is successful even if set capabilities fails", func(t *testing.T) { capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } client := &mockCustomCapabilityClient{} registry := newCustomCapabilityRegistry(zap.NewNop(), client) - defer registry.Stop() - - l := newMockCustomCapabilityListener(func(_ *protobufs.CustomMessage) {}) - sender, err := registry.Register(capabilityString, l) - require.NotNil(t, sender) + unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + t.Fatalf("Unregistered capability should not be called") + }) + require.NotNil(t, unregisteredCapability) require.NoError(t, err) client.setCustomCapabilites = func(_ *protobufs.CustomCapabilities) error { return fmt.Errorf("failed to set capabilities") } - close(l.doneChan) + unregisteredCapability.Unregister() + + registry.ProcessMessage(customMessage) + }) + + t.Run("Does not send if unregistered", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + t.Fatalf("Unregistered capability should not be called") + }) + require.NotNil(t, unregisteredCapability) + require.NoError(t, err) - require.Eventually(t, func() bool { - registry.mux.Lock() - defer registry.mux.Unlock() + unregisteredCapability.Unregister() - return len(registry.capabilityToListeners) == 0 - }, 2*time.Second, 100*time.Millisecond) + _, err = unregisteredCapability.SendMessage(messageType, mesageBytes) + require.ErrorContains(t, err, "capability has already been unregistered") }) } @@ -286,23 +273,3 @@ func (m mockCustomCapabilityClient) SendCustomMessage(message *protobufs.CustomM return make(chan struct{}), nil } - -type mockCustomCapabilityListener struct { - doneChan chan struct{} - receiveFunc func(*protobufs.CustomMessage) -} - -func newMockCustomCapabilityListener(recieveFunc func(*protobufs.CustomMessage)) *mockCustomCapabilityListener { - return &mockCustomCapabilityListener{ - doneChan: make(chan struct{}), - receiveFunc: recieveFunc, - } -} - -func (m *mockCustomCapabilityListener) ReceiveMessage(cm *protobufs.CustomMessage) { - m.receiveFunc(cm) -} - -func (m *mockCustomCapabilityListener) Done() <-chan struct{} { - return m.doneChan -} From bcb17e35efc49de36dc7accac65a67ef16ca05f7 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 17 Apr 2024 15:32:36 -0400 Subject: [PATCH 17/27] refactor to a loose unregister function --- extension/opampextension/custom_messages.go | 13 ++-- extension/opampextension/opamp_agent.go | 2 +- extension/opampextension/registry.go | 74 +++++++++++---------- extension/opampextension/registry_test.go | 34 ++++++---- 4 files changed, 66 insertions(+), 57 deletions(-) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index becefeb3f9c67..5bdccf9e0771d 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -16,13 +16,13 @@ type CustomCapabilityRegistry interface { // will be received by the given callback asynchronously. // It returns a handle to a CustomCapability, which can be used to unregister, or send // a message to the OpAMP server. - Register(capability string, callback CustomMessageCallback) (CustomCapability, error) + // It also returns a function that can be used to unregister the capability. + Register(capability string, callback CustomMessageCallback) (sender CustomMessageSender, unregister func(), err error) } -// CustomCapability represents a handle to a custom capability. -// This can be used to send a custom message to an OpAMP server, or to unregister -// the capability from the registry. -type CustomCapability interface { +// CustomMessageSender represents a handle to a custom capability. +// This can be used to send a custom message to an OpAMP server. +type CustomMessageSender interface { // SendMessage sends a custom message to the OpAMP server. // // Only one message can be sent at a time. If SendCustomMessage has been already called @@ -35,7 +35,4 @@ type CustomCapability interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) - - // Unregister unregisters the custom capability. - Unregister() } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 6a261a7a05c33..a71b4ed13a40e 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -125,7 +125,7 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error return nil } -func (o *opampAgent) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { +func (o *opampAgent) Register(capability string, callback CustomMessageCallback) (CustomMessageSender, func(), error) { return o.customCapabilityRegistry.Register(capability, callback) } diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index a164a19c5e6ca..85f76ad1e9e7c 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -38,7 +38,7 @@ func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClie } // Register registers a new custom capability for the -func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) { +func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomMessageSender, func(), error) { cr.mux.Lock() defer cr.mux.Unlock() @@ -51,7 +51,7 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM Capabilities: capabilities, }) if err != nil { - return nil, fmt.Errorf("set custom capabilities: %w", err) + return nil, nil, fmt.Errorf("set custom capabilities: %w", err) } capabilityList := cr.capabilityToCallbacks[capability] @@ -67,7 +67,7 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM // note: We'll have to set the self element in order for the custom capability to remove itself. cc.selfElement = callbackElem - return cc, nil + return cc, cr.removeCapabilityFunc(capability, callbackElem, cc), nil } // ProcessMessage processes a custom message, asynchronously broadcasting it to all registered callbacks for @@ -93,30 +93,36 @@ func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { } } -// RemoveCustomCapability removes the custom capability with the given callback list element, +// removeCapabilityFunc returns a func that removes the custom capability with the given callback list element and sender, // then recalculates and sets the list of custom capabilities on the OpAMP client. -func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, callbackElement *list.Element) { - cr.mux.Lock() - defer cr.mux.Unlock() +func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element, sender *customCapabilitySender) func() { + return func() { + // Mark the sender as unregistered, so that no more sends may be performed. + sender.markUnregistered() - callbackList := cr.capabilityToCallbacks[capability] - callbackList.Remove(callbackElement) + cr.mux.Lock() + defer cr.mux.Unlock() - if callbackList.Front() == nil { - // Since there are no more callbacks for this capability, - // this capability is no longer supported - delete(cr.capabilityToCallbacks, capability) - } + callbackList := cr.capabilityToCallbacks[capability] + callbackList.Remove(callbackElement) - capabilities := cr.capabilities() - err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ - Capabilities: capabilities, - }) - if err != nil { - // It's OK if we couldn't actually remove the capability, it just means we won't - // notify the server properly, and the server may send us messages that we have no associated callbacks for. - cr.logger.Error("Failed to set new capabilities", zap.Error(err)) + if callbackList.Front() == nil { + // Since there are no more callbacks for this capability, + // this capability is no longer supported + delete(cr.capabilityToCallbacks, capability) + } + + capabilities := cr.capabilities() + err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ + Capabilities: capabilities, + }) + if err != nil { + // It's OK if we couldn't actually remove the capability, it just means we won't + // notify the server properly, and the server may send us messages that we have no associated callbacks for. + cr.logger.Error("Failed to set new capabilities", zap.Error(err)) + } } + } // capabilities gives the current set of custom capabilities with at least one @@ -125,15 +131,16 @@ func (cr *customCapabilityRegistry) capabilities() []string { return maps.Keys(cr.capabilityToCallbacks) } -type customCapabilityHandler struct { +type customCapabilitySender struct { // unregisteredMux protects unregistered, and makes sure that a message cannot be sent // on an unregistered capability. unregisteredMux *sync.Mutex - capability string - selfElement *list.Element - opampClient customCapabilityClient - registry *customCapabilityRegistry + capability string + selfElement *list.Element + opampClient customCapabilityClient + registry *customCapabilityRegistry + unregistered bool } @@ -141,8 +148,8 @@ func newCustomCapability( registry *customCapabilityRegistry, opampClient customCapabilityClient, capability string, -) *customCapabilityHandler { - return &customCapabilityHandler{ +) *customCapabilitySender { + return &customCapabilitySender{ unregisteredMux: &sync.Mutex{}, capability: capability, @@ -152,7 +159,7 @@ func newCustomCapability( } // SendMessage synchronously sends the message -func (c *customCapabilityHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { +func (c *customCapabilitySender) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() @@ -169,14 +176,9 @@ func (c *customCapabilityHandler) SendMessage(messageType string, message []byte return c.opampClient.SendCustomMessage(cm) } -func (c *customCapabilityHandler) Unregister() { +func (c *customCapabilitySender) markUnregistered() { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() - if c.unregistered { - return - } - c.unregistered = true - c.registry.RemoveCustomCapability(c.capability, c.selfElement) } diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index e86172afbbaec..9a45f7dea0686 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -31,9 +31,10 @@ func TestRegistry_Register(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + capability, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) require.NoError(t, err) require.NotNil(t, capability) + require.NotNil(t, unregister) }) t.Run("Setting capabilities fails", func(t *testing.T) { @@ -48,9 +49,10 @@ func TestRegistry_Register(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + capability, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) require.Nil(t, capability) require.ErrorIs(t, err, capabilityErr) + require.Nil(t, unregister) require.Len(t, registry.capabilityToCallbacks, 0, "Setting capability failed, but callback ended up in the map anyways") }) } @@ -71,13 +73,14 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) callbackCalledChan := make(chan struct{}) - capability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + capability, unregister, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { require.Equal(t, customMessage, c) close(callbackCalledChan) }) require.NotNil(t, capability) require.NoError(t, err) + require.NotNil(t, unregister) registry.ProcessMessage(customMessage) select { @@ -115,22 +118,24 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) teapotCalledChan := make(chan struct{}) - capabilityTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { + capabilityTeapot, unregisterTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { require.Equal(t, customMessageSteep, c) close(teapotCalledChan) }) require.NotNil(t, capabilityTeapot) require.NoError(t, err) + require.NotNil(t, unregisterTeapot) coffeeMakerCalledChan := make(chan struct{}) - capabilityCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { + capabilityCoffeeMaker, unregisterCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { require.Equal(t, customMessageBrew, c) close(coffeeMakerCalledChan) }) require.NotNil(t, capabilityCoffeeMaker) require.NoError(t, err) + require.NotNil(t, unregisterCoffeeMaker) registry.ProcessMessage(customMessageSteep) registry.ProcessMessage(customMessageBrew) @@ -167,9 +172,10 @@ func TestCustomCapability_SendMesage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - capability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) + capability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) require.NoError(t, err) require.NotNil(t, capability) + require.NotNil(t, unregister) channel, err := capability.SendMessage(messageType, mesageBytes) require.NoError(t, err) @@ -192,13 +198,14 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + unregisteredCapability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) require.NotNil(t, unregisteredCapability) require.NoError(t, err) + require.NotNil(t, unregister) - unregisteredCapability.Unregister() + unregister() registry.ProcessMessage(customMessage) }) @@ -217,21 +224,23 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + unregisteredCapability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) require.NotNil(t, unregisteredCapability) require.NoError(t, err) + require.NotNil(t, unregister) client.setCustomCapabilites = func(_ *protobufs.CustomCapabilities) error { return fmt.Errorf("failed to set capabilities") } - unregisteredCapability.Unregister() + unregister() registry.ProcessMessage(customMessage) }) + // FIXME this test is broken t.Run("Does not send if unregistered", func(t *testing.T) { capabilityString := "io.opentelemetry.teapot" messageType := "steep" @@ -241,13 +250,14 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + unregisteredCapability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) require.NotNil(t, unregisteredCapability) require.NoError(t, err) + require.NotNil(t, unregister) - unregisteredCapability.Unregister() + unregister() _, err = unregisteredCapability.SendMessage(messageType, mesageBytes) require.ErrorContains(t, err, "capability has already been unregistered") From d506ea788d6b9bfe8af388755b9f153467cc625d Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 17 Apr 2024 15:42:15 -0400 Subject: [PATCH 18/27] comment/naming cleanup --- extension/opampextension/custom_messages.go | 4 +-- extension/opampextension/registry.go | 28 +++++++-------- extension/opampextension/registry_test.go | 40 ++++++++++----------- 3 files changed, 34 insertions(+), 38 deletions(-) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index 5bdccf9e0771d..70a19c032bba1 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -7,14 +7,14 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) -// CustomMessageCallback is a callback that handles a custom message from opamp. +// CustomMessageCallback is a callback that handles a custom message from an OpAMP server. type CustomMessageCallback func(*protobufs.CustomMessage) // CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages. type CustomCapabilityRegistry interface { // Register registers a new custom capability. Any messages for the capability // will be received by the given callback asynchronously. - // It returns a handle to a CustomCapability, which can be used to unregister, or send + // It returns a handle to a CustomCapability, which can be used to send // a message to the OpAMP server. // It also returns a function that can be used to unregister the capability. Register(capability string, callback CustomMessageCallback) (sender CustomMessageSender, unregister func(), err error) diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index 85f76ad1e9e7c..d74795ddd592c 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -15,7 +15,7 @@ import ( "golang.org/x/exp/maps" ) -// customCapabilityClient is a subset of OpAMP client containing only the +// customCapabilityClient is a subset of OpAMP client containing only the methods needed for the customCapabilityRegistry. type customCapabilityClient interface { SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) @@ -37,7 +37,7 @@ func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClie } } -// Register registers a new custom capability for the +// Register implements CustomCapabilityRegistry.Register func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomMessageSender, func(), error) { cr.mux.Lock() defer cr.mux.Unlock() @@ -62,12 +62,9 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM callbackElem := capabilityList.PushBack(callback) - cc := newCustomCapability(cr, cr.client, capability) + sender := newCustomMessageSender(cr, cr.client, capability) - // note: We'll have to set the self element in order for the custom capability to remove itself. - cc.selfElement = callbackElem - - return cc, cr.removeCapabilityFunc(capability, callbackElem, cc), nil + return sender, cr.removeCapabilityFunc(capability, callbackElem, sender), nil } // ProcessMessage processes a custom message, asynchronously broadcasting it to all registered callbacks for @@ -95,7 +92,7 @@ func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { // removeCapabilityFunc returns a func that removes the custom capability with the given callback list element and sender, // then recalculates and sets the list of custom capabilities on the OpAMP client. -func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element, sender *customCapabilitySender) func() { +func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element, sender *customMessageSender) func() { return func() { // Mark the sender as unregistered, so that no more sends may be performed. sender.markUnregistered() @@ -131,25 +128,24 @@ func (cr *customCapabilityRegistry) capabilities() []string { return maps.Keys(cr.capabilityToCallbacks) } -type customCapabilitySender struct { +type customMessageSender struct { // unregisteredMux protects unregistered, and makes sure that a message cannot be sent // on an unregistered capability. unregisteredMux *sync.Mutex capability string - selfElement *list.Element opampClient customCapabilityClient registry *customCapabilityRegistry unregistered bool } -func newCustomCapability( +func newCustomMessageSender( registry *customCapabilityRegistry, opampClient customCapabilityClient, capability string, -) *customCapabilitySender { - return &customCapabilitySender{ +) *customMessageSender { + return &customMessageSender{ unregisteredMux: &sync.Mutex{}, capability: capability, @@ -158,8 +154,8 @@ func newCustomCapability( } } -// SendMessage synchronously sends the message -func (c *customCapabilitySender) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { +// SendMessage implements CustomMessageSender.SendMessage +func (c *customMessageSender) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() @@ -176,7 +172,7 @@ func (c *customCapabilitySender) SendMessage(messageType string, message []byte) return c.opampClient.SendCustomMessage(cm) } -func (c *customCapabilitySender) markUnregistered() { +func (c *customMessageSender) markUnregistered() { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index 9a45f7dea0686..45bd789f422df 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -31,9 +31,9 @@ func TestRegistry_Register(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - capability, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + sender, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) require.NoError(t, err) - require.NotNil(t, capability) + require.NotNil(t, sender) require.NotNil(t, unregister) }) @@ -49,8 +49,8 @@ func TestRegistry_Register(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - capability, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) - require.Nil(t, capability) + sender, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + require.Nil(t, sender) require.ErrorIs(t, err, capabilityErr) require.Nil(t, unregister) require.Len(t, registry.capabilityToCallbacks, 0, "Setting capability failed, but callback ended up in the map anyways") @@ -73,12 +73,12 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) callbackCalledChan := make(chan struct{}) - capability, unregister, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { + sender, unregister, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { require.Equal(t, customMessage, c) close(callbackCalledChan) }) - require.NotNil(t, capability) + require.NotNil(t, sender) require.NoError(t, err) require.NotNil(t, unregister) @@ -118,22 +118,22 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) teapotCalledChan := make(chan struct{}) - capabilityTeapot, unregisterTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { + teapotSender, unregisterTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { require.Equal(t, customMessageSteep, c) close(teapotCalledChan) }) - require.NotNil(t, capabilityTeapot) + require.NotNil(t, teapotSender) require.NoError(t, err) require.NotNil(t, unregisterTeapot) coffeeMakerCalledChan := make(chan struct{}) - capabilityCoffeeMaker, unregisterCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { + coffeeMakerSender, unregisterCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { require.Equal(t, customMessageBrew, c) close(coffeeMakerCalledChan) }) - require.NotNil(t, capabilityCoffeeMaker) + require.NotNil(t, coffeeMakerSender) require.NoError(t, err) require.NotNil(t, unregisterCoffeeMaker) @@ -172,12 +172,12 @@ func TestCustomCapability_SendMesage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - capability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) + sender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) require.NoError(t, err) - require.NotNil(t, capability) + require.NotNil(t, sender) require.NotNil(t, unregister) - channel, err := capability.SendMessage(messageType, mesageBytes) + channel, err := sender.SendMessage(messageType, mesageBytes) require.NoError(t, err) require.Nil(t, channel, nil) }) @@ -198,10 +198,10 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + unregisteredSender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) - require.NotNil(t, unregisteredCapability) + require.NotNil(t, unregisteredSender) require.NoError(t, err) require.NotNil(t, unregister) @@ -224,10 +224,10 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + unregisteredSender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) - require.NotNil(t, unregisteredCapability) + require.NotNil(t, unregisteredSender) require.NoError(t, err) require.NotNil(t, unregister) @@ -250,16 +250,16 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredCapability, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { + unregisteredSender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { t.Fatalf("Unregistered capability should not be called") }) - require.NotNil(t, unregisteredCapability) + require.NotNil(t, unregisteredSender) require.NoError(t, err) require.NotNil(t, unregister) unregister() - _, err = unregisteredCapability.SendMessage(messageType, mesageBytes) + _, err = unregisteredSender.SendMessage(messageType, mesageBytes) require.ErrorContains(t, err, "capability has already been unregistered") }) } From 07597d32bf40b33cb7aae6581963985cc15201e0 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 23 Apr 2024 02:15:28 -0400 Subject: [PATCH 19/27] Refactor into suggested interface --- extension/opampextension/README.md | 18 +-- extension/opampextension/custom_messages.go | 52 ++++++--- extension/opampextension/opamp_agent.go | 4 +- extension/opampextension/registry.go | 103 ++++++++++------- extension/opampextension/registry_test.go | 116 ++++++++++---------- 5 files changed, 168 insertions(+), 125 deletions(-) diff --git a/extension/opampextension/README.md b/extension/opampextension/README.md index c60519c4de12c..0571b4cb9c0eb 100644 --- a/extension/opampextension/README.md +++ b/extension/opampextension/README.md @@ -49,19 +49,19 @@ Other components may use a configured OpAMP extension to send and receive custom ```go func Start(_ context.Context, host component.Host) error { - ext, ok := host.GetExtensions()[opampExtensionID] - if !ok { - return fmt.Errorf("opamp extension %q does not exist", opampExtensionID) - } - - registry, ok := ext.(opampextension.CustomCapabilityRegistry) - if !ok { - return fmt.Errorf("extension %q is not an custom message registry", opampExtensionID) + ext, ok := host.GetExtensions()[opampExtensionID] + if !ok { + return fmt.Errorf("opamp extension %q does not exist", opampExtensionID) + } + + registry, ok := ext.(opampextension.CustomCapabilityRegistry) + if !ok { + return fmt.Errorf("extension %q is not an custom message registry", opampExtensionID) } // You can now use registry.Register to register a custom capability - return nil + return nil } ``` diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index 70a19c032bba1..bb0355da26c67 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -3,26 +3,46 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension" -import ( - "github.com/open-telemetry/opamp-go/protobufs" -) +import "github.com/open-telemetry/opamp-go/protobufs" -// CustomMessageCallback is a callback that handles a custom message from an OpAMP server. -type CustomMessageCallback func(*protobufs.CustomMessage) +// customCapabilityRegisterOptions represent extra options that can be use in CustomCapabilityRegistry.Register +type customCapabilityRegisterOptions struct { + MaxQueuedMessages int +} + +// defaultCustomCapabilityRegisterOptions returns the default options for CustomCapabilityRegisterOptions +func defaultCustomCapabilityRegisterOptions() *customCapabilityRegisterOptions { + return &customCapabilityRegisterOptions{ + MaxQueuedMessages: 10, + } +} + +// CustomCapabilityRegisterOption represent a single option for CustomCapabilityRegistry.Register +type CustomCapabilityRegisterOption func(*customCapabilityRegisterOptions) + +// WithMaxQueuedMessages overrides the default amount of max queue messages. If a message is recieved while +// MaxQueuedMessages messages are already queued to be processed, the message is dropped. +func WithMaxQueuedMessages(maxQueuedMessages int) CustomCapabilityRegisterOption { + return func(c *customCapabilityRegisterOptions) { + c.MaxQueuedMessages = maxQueuedMessages + } +} // CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages. type CustomCapabilityRegistry interface { - // Register registers a new custom capability. Any messages for the capability - // will be received by the given callback asynchronously. - // It returns a handle to a CustomCapability, which can be used to send - // a message to the OpAMP server. - // It also returns a function that can be used to unregister the capability. - Register(capability string, callback CustomMessageCallback) (sender CustomMessageSender, unregister func(), err error) + // Register registers a new custom capability. + // It returns a CustomMessageHandler, which can be used to send and receive + // messages to/from the OpAMP server. + Register(capability string, opts ...CustomCapabilityRegisterOption) (handler CustomCapabilityHandler, err error) } -// CustomMessageSender represents a handle to a custom capability. -// This can be used to send a custom message to an OpAMP server. -type CustomMessageSender interface { +// CustomCapabilityHandler represents a handler for a custom capability. +// This can be used to send and receive custom messages to/from an OpAMP server. +// It can also be used to unregister the custom capability when it is no longer supported. +type CustomCapabilityHandler interface { + // Message returns a channel that can be used to receive custom messages sent from the OpAMP server. + Message() <-chan *protobufs.CustomMessage + // SendMessage sends a custom message to the OpAMP server. // // Only one message can be sent at a time. If SendCustomMessage has been already called @@ -35,4 +55,8 @@ type CustomMessageSender interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) + + // Unregister unregisters the custom capability. After this method returns, SendMessage will always return an error, + // and Message will no longer receive further custom messages. + Unregister() } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index a71b4ed13a40e..4f96d8247767b 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -125,8 +125,8 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error return nil } -func (o *opampAgent) Register(capability string, callback CustomMessageCallback) (CustomMessageSender, func(), error) { - return o.customCapabilityRegistry.Register(capability, callback) +func (o *opampAgent) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) { + return o.customCapabilityRegistry.Register(capability, opts...) } func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index d74795ddd592c..580f9cf525ba6 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -22,23 +22,28 @@ type customCapabilityClient interface { } type customCapabilityRegistry struct { - mux *sync.Mutex - capabilityToCallbacks map[string]*list.List - client customCapabilityClient - logger *zap.Logger + mux *sync.Mutex + capabilityToMsgChannels map[string]*list.List + client customCapabilityClient + logger *zap.Logger } func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry { return &customCapabilityRegistry{ - mux: &sync.Mutex{}, - capabilityToCallbacks: make(map[string]*list.List), - client: client, - logger: logger, + mux: &sync.Mutex{}, + capabilityToMsgChannels: make(map[string]*list.List), + client: client, + logger: logger, } } // Register implements CustomCapabilityRegistry.Register -func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomMessageSender, func(), error) { +func (cr *customCapabilityRegistry) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) { + optsStruct := defaultCustomCapabilityRegisterOptions() + for _, opt := range opts { + opt(optsStruct) + } + cr.mux.Lock() defer cr.mux.Unlock() @@ -51,62 +56,65 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM Capabilities: capabilities, }) if err != nil { - return nil, nil, fmt.Errorf("set custom capabilities: %w", err) + return nil, fmt.Errorf("set custom capabilities: %w", err) } - capabilityList := cr.capabilityToCallbacks[capability] + capabilityList := cr.capabilityToMsgChannels[capability] if capabilityList == nil { capabilityList = list.New() - cr.capabilityToCallbacks[capability] = capabilityList + cr.capabilityToMsgChannels[capability] = capabilityList } - callbackElem := capabilityList.PushBack(callback) + msgChan := make(chan *protobufs.CustomMessage, optsStruct.MaxQueuedMessages) + callbackElem := capabilityList.PushBack(msgChan) - sender := newCustomMessageSender(cr, cr.client, capability) + unregisterFunc := cr.removeCapabilityFunc(capability, callbackElem) + sender := newCustomMessageSender(cr, cr.client, capability, msgChan, unregisterFunc) - return sender, cr.removeCapabilityFunc(capability, callbackElem, sender), nil + return sender, nil } -// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered callbacks for +// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered capability handlers for // the messages capability. func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { cr.mux.Lock() defer cr.mux.Unlock() - callbacks, ok := cr.capabilityToCallbacks[cm.Capability] + msgChannels, ok := cr.capabilityToMsgChannels[cm.Capability] if !ok { return } - for node := callbacks.Front(); node != nil; node = node.Next() { - cb, ok := node.Value.(CustomMessageCallback) + for node := msgChannels.Front(); node != nil; node = node.Next() { + msgChan, ok := node.Value.(chan *protobufs.CustomMessage) if !ok { continue } - // Let the callback process asynchronously in a separate goroutine so it can't block - // the opamp extension - go cb(cm) + // If the channel is full, we will skip sending the message to the receiver. + // We do this because we don't want a misbehaving component to be able to + // block the opamp extension, or block other components from receiving messages. + select { + case msgChan <- cm: + default: + } } } -// removeCapabilityFunc returns a func that removes the custom capability with the given callback list element and sender, +// removeCapabilityFunc returns a func that removes the custom capability with the given msg channel list element and sender, // then recalculates and sets the list of custom capabilities on the OpAMP client. -func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element, sender *customMessageSender) func() { +func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element) func() { return func() { - // Mark the sender as unregistered, so that no more sends may be performed. - sender.markUnregistered() - cr.mux.Lock() defer cr.mux.Unlock() - callbackList := cr.capabilityToCallbacks[capability] - callbackList.Remove(callbackElement) + msgChanList := cr.capabilityToMsgChannels[capability] + msgChanList.Remove(callbackElement) - if callbackList.Front() == nil { + if msgChanList.Front() == nil { // Since there are no more callbacks for this capability, // this capability is no longer supported - delete(cr.capabilityToCallbacks, capability) + delete(cr.capabilityToMsgChannels, capability) } capabilities := cr.capabilities() @@ -119,13 +127,12 @@ func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, call cr.logger.Error("Failed to set new capabilities", zap.Error(err)) } } - } // capabilities gives the current set of custom capabilities with at least one // callback registered. func (cr *customCapabilityRegistry) capabilities() []string { - return maps.Keys(cr.capabilityToCallbacks) + return maps.Keys(cr.capabilityToMsgChannels) } type customMessageSender struct { @@ -133,9 +140,11 @@ type customMessageSender struct { // on an unregistered capability. unregisteredMux *sync.Mutex - capability string - opampClient customCapabilityClient - registry *customCapabilityRegistry + capability string + opampClient customCapabilityClient + registry *customCapabilityRegistry + sendChan <-chan *protobufs.CustomMessage + unregisterCapabilityFunc func() unregistered bool } @@ -144,17 +153,26 @@ func newCustomMessageSender( registry *customCapabilityRegistry, opampClient customCapabilityClient, capability string, + sendChan <-chan *protobufs.CustomMessage, + unregisterCapabilityFunc func(), ) *customMessageSender { return &customMessageSender{ unregisteredMux: &sync.Mutex{}, - capability: capability, - opampClient: opampClient, - registry: registry, + capability: capability, + opampClient: opampClient, + registry: registry, + sendChan: sendChan, + unregisterCapabilityFunc: unregisterCapabilityFunc, } } -// SendMessage implements CustomMessageSender.SendMessage +// Message implements CustomCapabilityHandler.Message +func (c *customMessageSender) Message() <-chan *protobufs.CustomMessage { + return c.sendChan +} + +// SendMessage implements CustomCapabilityHandler.SendMessage func (c *customMessageSender) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() @@ -172,9 +190,12 @@ func (c *customMessageSender) SendMessage(messageType string, message []byte) (m return c.opampClient.SendCustomMessage(cm) } -func (c *customMessageSender) markUnregistered() { +// Unregister implements CustomCapabilityHandler.Unregister +func (c *customMessageSender) Unregister() { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() c.unregistered = true + + c.unregisterCapabilityFunc() } diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index 45bd789f422df..57abf122f2265 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "testing" - "time" "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/require" @@ -31,10 +30,9 @@ func TestRegistry_Register(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - sender, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + sender, err := registry.Register(capabilityString) require.NoError(t, err) require.NotNil(t, sender) - require.NotNil(t, unregister) }) t.Run("Setting capabilities fails", func(t *testing.T) { @@ -49,11 +47,10 @@ func TestRegistry_Register(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - sender, unregister, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {}) + sender, err := registry.Register(capabilityString) require.Nil(t, sender) require.ErrorIs(t, err, capabilityErr) - require.Nil(t, unregister) - require.Len(t, registry.capabilityToCallbacks, 0, "Setting capability failed, but callback ended up in the map anyways") + require.Len(t, registry.capabilityToMsgChannels, 0, "Setting capability failed, but callback ended up in the map anyways") }) } @@ -72,23 +69,37 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - callbackCalledChan := make(chan struct{}) - sender, unregister, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) { - require.Equal(t, customMessage, c) - - close(callbackCalledChan) - }) + sender, err := registry.Register(capabilityString) require.NotNil(t, sender) require.NoError(t, err) - require.NotNil(t, unregister) registry.ProcessMessage(customMessage) - select { - case <-time.After(2 * time.Second): - t.Fatalf("Timed out waiting for callback to be called") - case <-callbackCalledChan: // OK + + require.Equal(t, customMessage, <-sender.Message()) + }) + + t.Run("Skips blocked message channels", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, } + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + sender, err := registry.Register(capabilityString, WithMaxQueuedMessages(0)) + require.NotNil(t, sender) + require.NoError(t, err) + + // If we did not skip sending on blocked channels, we'd expect this to never return. + registry.ProcessMessage(customMessage) + + require.Equal(t, 0, len(sender.Message())) }) t.Run("Callback is called only for its own capability", func(t *testing.T) { @@ -117,39 +128,19 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - teapotCalledChan := make(chan struct{}) - teapotSender, unregisterTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) { - require.Equal(t, customMessageSteep, c) - - close(teapotCalledChan) - }) + teapotSender, err := registry.Register(teapotCapabilityString1) require.NotNil(t, teapotSender) require.NoError(t, err) - require.NotNil(t, unregisterTeapot) - - coffeeMakerCalledChan := make(chan struct{}) - coffeeMakerSender, unregisterCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) { - require.Equal(t, customMessageBrew, c) - close(coffeeMakerCalledChan) - }) + coffeeMakerSender, err := registry.Register(coffeeMakerCapabilityString2) require.NotNil(t, coffeeMakerSender) require.NoError(t, err) - require.NotNil(t, unregisterCoffeeMaker) registry.ProcessMessage(customMessageSteep) registry.ProcessMessage(customMessageBrew) - select { - case <-time.After(2 * time.Second): - t.Fatalf("Timed out waiting for callback 1 to be called") - case <-coffeeMakerCalledChan: // OK - } - select { - case <-time.After(2 * time.Second): - t.Fatalf("Timed out waiting for callback 2 to be called") - case <-coffeeMakerCalledChan: // OK - } + require.Equal(t, customMessageSteep, <-teapotSender.Message()) + require.Equal(t, customMessageBrew, <-coffeeMakerSender.Message()) }) } @@ -172,10 +163,9 @@ func TestCustomCapability_SendMesage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - sender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {}) + sender, err := registry.Register(capabilityString) require.NoError(t, err) require.NotNil(t, sender) - require.NotNil(t, unregister) channel, err := sender.SendMessage(messageType, mesageBytes) require.NoError(t, err) @@ -198,16 +188,19 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredSender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { - t.Fatalf("Unregistered capability should not be called") - }) + unregisteredSender, err := registry.Register(capabilityString) require.NotNil(t, unregisteredSender) require.NoError(t, err) - require.NotNil(t, unregister) - unregister() + unregisteredSender.Unregister() registry.ProcessMessage(customMessage) + + select { + case <-unregisteredSender.Message(): + t.Fatalf("Unregistered capability should not be called") + default: // OK + } }) t.Run("Unregister is successful even if set capabilities fails", func(t *testing.T) { @@ -224,23 +217,25 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredSender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { - t.Fatalf("Unregistered capability should not be called") - }) + unregisteredSender, err := registry.Register(capabilityString) require.NotNil(t, unregisteredSender) require.NoError(t, err) - require.NotNil(t, unregister) client.setCustomCapabilites = func(_ *protobufs.CustomCapabilities) error { return fmt.Errorf("failed to set capabilities") } - unregister() + unregisteredSender.Unregister() registry.ProcessMessage(customMessage) + + select { + case <-unregisteredSender.Message(): + t.Fatalf("Unregistered capability should not be called") + default: // OK + } }) - // FIXME this test is broken t.Run("Does not send if unregistered", func(t *testing.T) { capabilityString := "io.opentelemetry.teapot" messageType := "steep" @@ -250,17 +245,20 @@ func TestCustomCapability_Unregister(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - unregisteredSender, unregister, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) { - t.Fatalf("Unregistered capability should not be called") - }) + unregisteredSender, err := registry.Register(capabilityString) require.NotNil(t, unregisteredSender) require.NoError(t, err) - require.NotNil(t, unregister) - unregister() + unregisteredSender.Unregister() _, err = unregisteredSender.SendMessage(messageType, mesageBytes) require.ErrorContains(t, err, "capability has already been unregistered") + + select { + case <-unregisteredSender.Message(): + t.Fatalf("Unregistered capability should not be called") + default: // OK + } }) } From 448cf875c9f4c2fc5dd8a081b6025eb4484dfc10 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 23 Apr 2024 02:29:54 -0400 Subject: [PATCH 20/27] fix typo in readme example --- extension/opampextension/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/opampextension/README.md b/extension/opampextension/README.md index 0571b4cb9c0eb..fc199381fd7e5 100644 --- a/extension/opampextension/README.md +++ b/extension/opampextension/README.md @@ -56,7 +56,7 @@ func Start(_ context.Context, host component.Host) error { registry, ok := ext.(opampextension.CustomCapabilityRegistry) if !ok { - return fmt.Errorf("extension %q is not an custom message registry", opampExtensionID) + return fmt.Errorf("extension %q is not a custom message registry", opampExtensionID) } // You can now use registry.Register to register a custom capability From 1847e41040c24ab86da99f7de71f33a2bfe6460c Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 23 Apr 2024 12:03:31 -0400 Subject: [PATCH 21/27] fix spelling error in comment --- extension/opampextension/custom_messages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index bb0355da26c67..e00d08fffaae0 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -20,7 +20,7 @@ func defaultCustomCapabilityRegisterOptions() *customCapabilityRegisterOptions { // CustomCapabilityRegisterOption represent a single option for CustomCapabilityRegistry.Register type CustomCapabilityRegisterOption func(*customCapabilityRegisterOptions) -// WithMaxQueuedMessages overrides the default amount of max queue messages. If a message is recieved while +// WithMaxQueuedMessages overrides the default amount of max queue messages. If a message is received while // MaxQueuedMessages messages are already queued to be processed, the message is dropped. func WithMaxQueuedMessages(maxQueuedMessages int) CustomCapabilityRegisterOption { return func(c *customCapabilityRegisterOptions) { From 2629cf8756bd09f2961a868be90f6b18c536d7d3 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 24 Apr 2024 11:21:55 -0400 Subject: [PATCH 22/27] Fix readme indentation --- extension/opampextension/README.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/extension/opampextension/README.md b/extension/opampextension/README.md index fc199381fd7e5..e1da423682431 100644 --- a/extension/opampextension/README.md +++ b/extension/opampextension/README.md @@ -49,19 +49,19 @@ Other components may use a configured OpAMP extension to send and receive custom ```go func Start(_ context.Context, host component.Host) error { - ext, ok := host.GetExtensions()[opampExtensionID] - if !ok { - return fmt.Errorf("opamp extension %q does not exist", opampExtensionID) - } - - registry, ok := ext.(opampextension.CustomCapabilityRegistry) - if !ok { - return fmt.Errorf("extension %q is not a custom message registry", opampExtensionID) + ext, ok := host.GetExtensions()[opampExtensionID] + if !ok { + return fmt.Errorf("opamp extension %q does not exist", opampExtensionID) } - // You can now use registry.Register to register a custom capability + registry, ok := ext.(opampextension.CustomCapabilityRegistry) + if !ok { + return fmt.Errorf("extension %q is not a custom message registry", opampExtensionID) + } + + // You can now use registry.Register to register a custom capability - return nil + return nil } ``` From ff42ce106d01c433455e42f641c848c30a0d2895 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 24 Apr 2024 11:24:20 -0400 Subject: [PATCH 23/27] Fix comments, unexport withMaxQueuedMessages --- extension/opampextension/custom_messages.go | 6 +++--- extension/opampextension/registry_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go index e00d08fffaae0..b6183964432d4 100644 --- a/extension/opampextension/custom_messages.go +++ b/extension/opampextension/custom_messages.go @@ -5,7 +5,7 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec import "github.com/open-telemetry/opamp-go/protobufs" -// customCapabilityRegisterOptions represent extra options that can be use in CustomCapabilityRegistry.Register +// customCapabilityRegisterOptions represents extra options that can be use in CustomCapabilityRegistry.Register type customCapabilityRegisterOptions struct { MaxQueuedMessages int } @@ -20,9 +20,9 @@ func defaultCustomCapabilityRegisterOptions() *customCapabilityRegisterOptions { // CustomCapabilityRegisterOption represent a single option for CustomCapabilityRegistry.Register type CustomCapabilityRegisterOption func(*customCapabilityRegisterOptions) -// WithMaxQueuedMessages overrides the default amount of max queue messages. If a message is received while +// withMaxQueuedMessages overrides the maximum number of queued messages. If a message is received while // MaxQueuedMessages messages are already queued to be processed, the message is dropped. -func WithMaxQueuedMessages(maxQueuedMessages int) CustomCapabilityRegisterOption { +func withMaxQueuedMessages(maxQueuedMessages int) CustomCapabilityRegisterOption { return func(c *customCapabilityRegisterOptions) { c.MaxQueuedMessages = maxQueuedMessages } diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index 57abf122f2265..b18388d36da09 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -92,7 +92,7 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry := newCustomCapabilityRegistry(zap.NewNop(), client) - sender, err := registry.Register(capabilityString, WithMaxQueuedMessages(0)) + sender, err := registry.Register(capabilityString, withMaxQueuedMessages(0)) require.NotNil(t, sender) require.NoError(t, err) From 281ee8da6c692e5416ce871275dfdfc233e83809 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 24 Apr 2024 11:25:36 -0400 Subject: [PATCH 24/27] add implementation assertions --- extension/opampextension/registry.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index 580f9cf525ba6..b18aef2e0b355 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -28,6 +28,8 @@ type customCapabilityRegistry struct { logger *zap.Logger } +var _ CustomCapabilityRegistry = (*customCapabilityRegistry)(nil) + func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry { return &customCapabilityRegistry{ mux: &sync.Mutex{}, @@ -149,6 +151,8 @@ type customMessageSender struct { unregistered bool } +var _ CustomCapabilityHandler = (*customMessageSender)(nil) + func newCustomMessageSender( registry *customCapabilityRegistry, opampClient customCapabilityClient, From 0e3d8b12af936b388ef96716681265a057235394 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 24 Apr 2024 11:26:03 -0400 Subject: [PATCH 25/27] rename customeMessageSender -> customMessageHandler --- extension/opampextension/registry.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index b18aef2e0b355..acf6119fbc8e7 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -137,7 +137,7 @@ func (cr *customCapabilityRegistry) capabilities() []string { return maps.Keys(cr.capabilityToMsgChannels) } -type customMessageSender struct { +type customMessageHandler struct { // unregisteredMux protects unregistered, and makes sure that a message cannot be sent // on an unregistered capability. unregisteredMux *sync.Mutex @@ -151,7 +151,7 @@ type customMessageSender struct { unregistered bool } -var _ CustomCapabilityHandler = (*customMessageSender)(nil) +var _ CustomCapabilityHandler = (*customMessageHandler)(nil) func newCustomMessageSender( registry *customCapabilityRegistry, @@ -159,8 +159,8 @@ func newCustomMessageSender( capability string, sendChan <-chan *protobufs.CustomMessage, unregisterCapabilityFunc func(), -) *customMessageSender { - return &customMessageSender{ +) *customMessageHandler { + return &customMessageHandler{ unregisteredMux: &sync.Mutex{}, capability: capability, @@ -172,12 +172,12 @@ func newCustomMessageSender( } // Message implements CustomCapabilityHandler.Message -func (c *customMessageSender) Message() <-chan *protobufs.CustomMessage { +func (c *customMessageHandler) Message() <-chan *protobufs.CustomMessage { return c.sendChan } // SendMessage implements CustomCapabilityHandler.SendMessage -func (c *customMessageSender) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { +func (c *customMessageHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() @@ -195,7 +195,7 @@ func (c *customMessageSender) SendMessage(messageType string, message []byte) (m } // Unregister implements CustomCapabilityHandler.Unregister -func (c *customMessageSender) Unregister() { +func (c *customMessageHandler) Unregister() { c.unregisteredMux.Lock() defer c.unregisteredMux.Unlock() From 1f8921d9aa30d6af65ed251f7fb7df16616a3497 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 24 Apr 2024 11:28:38 -0400 Subject: [PATCH 26/27] Check channels are empty in multi capability test --- extension/opampextension/registry_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go index b18388d36da09..79ed2bf23332d 100644 --- a/extension/opampextension/registry_test.go +++ b/extension/opampextension/registry_test.go @@ -140,7 +140,9 @@ func TestRegistry_ProcessMessage(t *testing.T) { registry.ProcessMessage(customMessageBrew) require.Equal(t, customMessageSteep, <-teapotSender.Message()) + require.Empty(t, teapotSender.Message()) require.Equal(t, customMessageBrew, <-coffeeMakerSender.Message()) + require.Empty(t, coffeeMakerSender.Message()) }) } From 51669889f490e9199506aec868c51a51137a2935 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 24 Apr 2024 11:47:42 -0400 Subject: [PATCH 27/27] newCustomMessageSender -> newCustomMessageHandler --- extension/opampextension/registry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go index acf6119fbc8e7..4c97c5500c20f 100644 --- a/extension/opampextension/registry.go +++ b/extension/opampextension/registry.go @@ -71,7 +71,7 @@ func (cr *customCapabilityRegistry) Register(capability string, opts ...CustomCa callbackElem := capabilityList.PushBack(msgChan) unregisterFunc := cr.removeCapabilityFunc(capability, callbackElem) - sender := newCustomMessageSender(cr, cr.client, capability, msgChan, unregisterFunc) + sender := newCustomMessageHandler(cr, cr.client, capability, msgChan, unregisterFunc) return sender, nil } @@ -153,7 +153,7 @@ type customMessageHandler struct { var _ CustomCapabilityHandler = (*customMessageHandler)(nil) -func newCustomMessageSender( +func newCustomMessageHandler( registry *customCapabilityRegistry, opampClient customCapabilityClient, capability string,