Skip to content

Commit

Permalink
[extension/opamp]: Custom Message Support (open-telemetry#32281)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
* Adds support for utilizing custom capabilities with the opamp
extension

**Link to tracking Issue:** open-telemetry#32021

**Testing:**
* Unit tests

**Documentation:**
Added some docs for usage to the opamp extension README
  • Loading branch information
BinaryFissionGames committed May 3, 2024
1 parent bb88759 commit c160941
Show file tree
Hide file tree
Showing 6 changed files with 625 additions and 15 deletions.
22 changes: 22 additions & 0 deletions .chloggen/feat_opamp-extension-custom-messages.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Added support for other components to register custom capabilities and receive custom messages from an opamp extension"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32021]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 24 additions & 0 deletions extension/opampextension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,30 @@ extensions:
endpoint: wss:https://127.0.0.1:4320/v1/opamp
```

## Custom Messages

Other components may use a configured OpAMP extension to send and receive custom messages to and from an OpAMP server. Components may use the provided `components.Host` from the Start method in order to get a handle to the registry:

```go
func Start(_ context.Context, host component.Host) error {
ext, ok := host.GetExtensions()[opampExtensionID]
if !ok {
return fmt.Errorf("opamp extension %q does not exist", opampExtensionID)
}

registry, ok := ext.(opampextension.CustomCapabilityRegistry)
if !ok {
return fmt.Errorf("extension %q is not a custom message registry", opampExtensionID)
}

// You can now use registry.Register to register a custom capability

return nil
}
```

See the [custom_messages.go](./custom_messages.go) for more information on the custom message API.

## Status

This OpenTelemetry OpAMP agent extension is intended to support the [OpAMP
Expand Down
62 changes: 62 additions & 0 deletions extension/opampextension/custom_messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension"

import "github.com/open-telemetry/opamp-go/protobufs"

// customCapabilityRegisterOptions represents extra options that can be use in CustomCapabilityRegistry.Register
type customCapabilityRegisterOptions struct {
MaxQueuedMessages int
}

// defaultCustomCapabilityRegisterOptions returns the default options for CustomCapabilityRegisterOptions
func defaultCustomCapabilityRegisterOptions() *customCapabilityRegisterOptions {
return &customCapabilityRegisterOptions{
MaxQueuedMessages: 10,
}
}

// CustomCapabilityRegisterOption represent a single option for CustomCapabilityRegistry.Register
type CustomCapabilityRegisterOption func(*customCapabilityRegisterOptions)

// withMaxQueuedMessages overrides the maximum number of queued messages. If a message is received while
// MaxQueuedMessages messages are already queued to be processed, the message is dropped.
func withMaxQueuedMessages(maxQueuedMessages int) CustomCapabilityRegisterOption {
return func(c *customCapabilityRegisterOptions) {
c.MaxQueuedMessages = maxQueuedMessages
}
}

// CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages.
type CustomCapabilityRegistry interface {
// Register registers a new custom capability.
// It returns a CustomMessageHandler, which can be used to send and receive
// messages to/from the OpAMP server.
Register(capability string, opts ...CustomCapabilityRegisterOption) (handler CustomCapabilityHandler, err error)
}

// CustomCapabilityHandler represents a handler for a custom capability.
// This can be used to send and receive custom messages to/from an OpAMP server.
// It can also be used to unregister the custom capability when it is no longer supported.
type CustomCapabilityHandler interface {
// Message returns a channel that can be used to receive custom messages sent from the OpAMP server.
Message() <-chan *protobufs.CustomMessage

// SendMessage sends a custom message to the OpAMP server.
//
// Only one message can be sent at a time. If SendCustomMessage has been already called
// for any capability, and the message is still pending (in progress)
// then subsequent calls to SendCustomMessage will return github.com/open-telemetry/opamp-go/types.ErrCustomMessagePending
// and a channel that will be closed when the pending message is sent.
// To ensure that the previous send is complete and it is safe to send another CustomMessage,
// the caller should wait for the returned channel to be closed before attempting to send another custom message.
//
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error)

// Unregister unregisters the custom capability. After this method returns, SendMessage will always return an error,
// and Message will no longer receive further custom messages.
Unregister()
}
42 changes: 27 additions & 15 deletions extension/opampextension/opamp_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ type opampAgent struct {
agentDescription *protobufs.AgentDescription

opampClient client.OpAMPClient

customCapabilityRegistry *customCapabilityRegistry
}

var _ CustomCapabilityRegistry = (*opampAgent)(nil)

func (o *opampAgent) Start(ctx context.Context, _ component.Host) error {
header := http.Header{}
for k, v := range o.cfg.Server.GetHeaders() {
Expand Down Expand Up @@ -121,6 +125,10 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error
return nil
}

func (o *opampAgent) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) {
return o.customCapabilityRegistry.Register(capability, opts...)
}

func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) {
o.eclk.Lock()
defer o.eclk.Unlock()
Expand Down Expand Up @@ -162,14 +170,16 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r
}
}

opampClient := cfg.Server.GetClient(logger)
agent := &opampAgent{
cfg: cfg,
logger: logger,
agentType: agentType,
agentVersion: agentVersion,
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: cfg.Server.GetClient(logger),
cfg: cfg,
logger: logger,
agentType: agentType,
agentVersion: agentVersion,
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: opampClient,
customCapabilityRegistry: newCustomCapabilityRegistry(logger, opampClient),
}

return agent, nil
Expand Down Expand Up @@ -256,15 +266,17 @@ func (o *opampAgent) composeEffectiveConfig() *protobufs.EffectiveConfig {
}

func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) {
if msg.AgentIdentification == nil {
return
}
if msg.AgentIdentification != nil {
instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid)
if err != nil {
o.logger.Error("Failed to parse a new agent identity", zap.Error(err))
return
}

instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid)
if err != nil {
o.logger.Error("Failed to parse a new agent identity", zap.Error(err))
return
o.updateAgentIdentity(instanceID)
}

o.updateAgentIdentity(instanceID)
if msg.CustomMessage != nil {
o.customCapabilityRegistry.ProcessMessage(msg.CustomMessage)
}
}
205 changes: 205 additions & 0 deletions extension/opampextension/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension"

import (
"container/list"
"errors"
"fmt"
"slices"
"sync"

"github.com/open-telemetry/opamp-go/protobufs"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// customCapabilityClient is a subset of OpAMP client containing only the methods needed for the customCapabilityRegistry.
type customCapabilityClient interface {
SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)
}

type customCapabilityRegistry struct {
mux *sync.Mutex
capabilityToMsgChannels map[string]*list.List
client customCapabilityClient
logger *zap.Logger
}

var _ CustomCapabilityRegistry = (*customCapabilityRegistry)(nil)

func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry {
return &customCapabilityRegistry{
mux: &sync.Mutex{},
capabilityToMsgChannels: make(map[string]*list.List),
client: client,
logger: logger,
}
}

// Register implements CustomCapabilityRegistry.Register
func (cr *customCapabilityRegistry) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) {
optsStruct := defaultCustomCapabilityRegisterOptions()
for _, opt := range opts {
opt(optsStruct)
}

cr.mux.Lock()
defer cr.mux.Unlock()

capabilities := cr.capabilities()
if !slices.Contains(capabilities, capability) {
capabilities = append(capabilities, capability)
}

err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
return nil, fmt.Errorf("set custom capabilities: %w", err)
}

capabilityList := cr.capabilityToMsgChannels[capability]
if capabilityList == nil {
capabilityList = list.New()
cr.capabilityToMsgChannels[capability] = capabilityList
}

msgChan := make(chan *protobufs.CustomMessage, optsStruct.MaxQueuedMessages)
callbackElem := capabilityList.PushBack(msgChan)

unregisterFunc := cr.removeCapabilityFunc(capability, callbackElem)
sender := newCustomMessageHandler(cr, cr.client, capability, msgChan, unregisterFunc)

return sender, nil
}

// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered capability handlers for
// the messages capability.
func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) {
cr.mux.Lock()
defer cr.mux.Unlock()

msgChannels, ok := cr.capabilityToMsgChannels[cm.Capability]
if !ok {
return
}

for node := msgChannels.Front(); node != nil; node = node.Next() {
msgChan, ok := node.Value.(chan *protobufs.CustomMessage)
if !ok {
continue
}

// If the channel is full, we will skip sending the message to the receiver.
// We do this because we don't want a misbehaving component to be able to
// block the opamp extension, or block other components from receiving messages.
select {
case msgChan <- cm:
default:
}
}
}

// removeCapabilityFunc returns a func that removes the custom capability with the given msg channel list element and sender,
// then recalculates and sets the list of custom capabilities on the OpAMP client.
func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element) func() {
return func() {
cr.mux.Lock()
defer cr.mux.Unlock()

msgChanList := cr.capabilityToMsgChannels[capability]
msgChanList.Remove(callbackElement)

if msgChanList.Front() == nil {
// Since there are no more callbacks for this capability,
// this capability is no longer supported
delete(cr.capabilityToMsgChannels, capability)
}

capabilities := cr.capabilities()
err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
// It's OK if we couldn't actually remove the capability, it just means we won't
// notify the server properly, and the server may send us messages that we have no associated callbacks for.
cr.logger.Error("Failed to set new capabilities", zap.Error(err))
}
}
}

// capabilities gives the current set of custom capabilities with at least one
// callback registered.
func (cr *customCapabilityRegistry) capabilities() []string {
return maps.Keys(cr.capabilityToMsgChannels)
}

type customMessageHandler struct {
// unregisteredMux protects unregistered, and makes sure that a message cannot be sent
// on an unregistered capability.
unregisteredMux *sync.Mutex

capability string
opampClient customCapabilityClient
registry *customCapabilityRegistry
sendChan <-chan *protobufs.CustomMessage
unregisterCapabilityFunc func()

unregistered bool
}

var _ CustomCapabilityHandler = (*customMessageHandler)(nil)

func newCustomMessageHandler(
registry *customCapabilityRegistry,
opampClient customCapabilityClient,
capability string,
sendChan <-chan *protobufs.CustomMessage,
unregisterCapabilityFunc func(),
) *customMessageHandler {
return &customMessageHandler{
unregisteredMux: &sync.Mutex{},

capability: capability,
opampClient: opampClient,
registry: registry,
sendChan: sendChan,
unregisterCapabilityFunc: unregisterCapabilityFunc,
}
}

// Message implements CustomCapabilityHandler.Message
func (c *customMessageHandler) Message() <-chan *protobufs.CustomMessage {
return c.sendChan
}

// SendMessage implements CustomCapabilityHandler.SendMessage
func (c *customMessageHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) {
c.unregisteredMux.Lock()
defer c.unregisteredMux.Unlock()

if c.unregistered {
return nil, errors.New("capability has already been unregistered")
}

cm := &protobufs.CustomMessage{
Capability: c.capability,
Type: messageType,
Data: message,
}

return c.opampClient.SendCustomMessage(cm)
}

// Unregister implements CustomCapabilityHandler.Unregister
func (c *customMessageHandler) Unregister() {
c.unregisteredMux.Lock()
defer c.unregisteredMux.Unlock()

c.unregistered = true

c.unregisterCapabilityFunc()
}
Loading

0 comments on commit c160941

Please sign in to comment.