Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/opamp]: Custom Message Support #32281

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5182859
custom message interface
BinaryFissionGames Mar 27, 2024
b5fd947
opamp custom messages prototype
BinaryFissionGames Apr 1, 2024
3901617
finish registry + tests
BinaryFissionGames Apr 9, 2024
b83d7c1
ensure extension is a valid registry
BinaryFissionGames Apr 9, 2024
0c1fe09
add license
BinaryFissionGames Apr 9, 2024
8b5f549
add check for unregistered in Unregister
BinaryFissionGames Apr 9, 2024
e204bcf
process custom messages with the registry when received.
BinaryFissionGames Apr 10, 2024
47f1987
add chlog
BinaryFissionGames Apr 10, 2024
e277407
add readme section about custom messages
BinaryFissionGames Apr 10, 2024
5ece376
fix lint errors
BinaryFissionGames Apr 10, 2024
4a2d18a
make goporto
BinaryFissionGames Apr 15, 2024
af558b1
fix incorrect tracking issue
BinaryFissionGames Apr 16, 2024
df759d4
Rework interface based on feedback
BinaryFissionGames Apr 17, 2024
c31315c
callback -> listener
BinaryFissionGames Apr 17, 2024
0aa8fba
Revert "callback -> listener"
BinaryFissionGames Apr 17, 2024
9f7de59
Revert "Rework interface based on feedback"
BinaryFissionGames Apr 17, 2024
bcb17e3
refactor to a loose unregister function
BinaryFissionGames Apr 17, 2024
d506ea7
comment/naming cleanup
BinaryFissionGames Apr 17, 2024
07597d3
Refactor into suggested interface
BinaryFissionGames Apr 23, 2024
448cf87
fix typo in readme example
BinaryFissionGames Apr 23, 2024
1847e41
fix spelling error in comment
BinaryFissionGames Apr 23, 2024
2629cf8
Fix readme indentation
BinaryFissionGames Apr 24, 2024
ff42ce1
Fix comments, unexport withMaxQueuedMessages
BinaryFissionGames Apr 24, 2024
281ee8d
add implementation assertions
BinaryFissionGames Apr 24, 2024
0e3d8b1
rename customeMessageSender -> customMessageHandler
BinaryFissionGames Apr 24, 2024
1f8921d
Check channels are empty in multi capability test
BinaryFissionGames Apr 24, 2024
5166988
newCustomMessageSender -> newCustomMessageHandler
BinaryFissionGames Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
opamp custom messages prototype
  • Loading branch information
BinaryFissionGames committed May 3, 2024
commit b5fd947754dc9e70f2e92bbe9a9d1985272b801b
19 changes: 16 additions & 3 deletions extension/opampextension/custom_messages.go
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package opampextension

import "github.com/open-telemetry/opamp-go/protobufs"
import (
"github.com/open-telemetry/opamp-go/protobufs"
)

// CustomMessageCallback is a callback that handles a custom message from opamp.
type CustomMessageCallback func(*protobufs.CustomMessage)
Expand All @@ -11,15 +13,26 @@ type CustomCapabilityRegistry interface {
// will be received by the given callback asynchronously.
// It returns a handle to a CustomCapability, which can be used to unregister, or send
// a message to the OpAMP server.
Register(capability string, callback CustomMessageCallback) CustomCapability
Register(capability string, callback CustomMessageCallback) (CustomCapability, error)
}

// CustomCapability represents a handle to a custom capability.
// This can be used to send a custom message to an OpAMP server, or to unregister
// the capability from the registry.
type CustomCapability interface {
// SendMessage sends a custom message to the OpAMP server.
SendMessage(messageType string, message []byte) error
//
// Only one message can be sent at a time. If SendCustomMessage has been already called
// for any capability, and the message is still pending (in progress)
// then subsequent calls to SendCustomMessage will return github.com/open-telemetry/opamp-go/types.ErrCustomMessagePending
// and a channel that will be closed when the pending message is sent.
// To ensure that the previous send is complete and it is safe to send another CustomMessage,
// the caller should wait for the returned channel to be closed before attempting to send another custom message.
//
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error)

// Unregister unregisters the custom capability.
Unregister()
}
22 changes: 15 additions & 7 deletions extension/opampextension/opamp_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type opampAgent struct {
agentDescription *protobufs.AgentDescription

opampClient client.OpAMPClient

customCapabilityRegistry *customCapabilityRegistry
}

func (o *opampAgent) Start(ctx context.Context, _ component.Host) error {
Expand Down Expand Up @@ -121,6 +123,10 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error
return nil
}

func (o *opampAgent) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) {
return o.customCapabilityRegistry.Register(capability, callback)
}

func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) {
o.eclk.Lock()
defer o.eclk.Unlock()
Expand Down Expand Up @@ -162,14 +168,16 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r
}
}

opampClient := cfg.Server.GetClient(logger)
agent := &opampAgent{
cfg: cfg,
logger: logger,
agentType: agentType,
agentVersion: agentVersion,
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: cfg.Server.GetClient(logger),
cfg: cfg,
logger: logger,
agentType: agentType,
agentVersion: agentVersion,
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: opampClient,
customCapabilityRegistry: newCustomCapabilityRegistry(logger, opampClient),
}

return agent, nil
Expand Down
170 changes: 170 additions & 0 deletions extension/opampextension/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package opampextension

import (
"container/list"
"errors"
"fmt"
"slices"
"sync"

"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/protobufs"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

type customCapabilityRegistry struct {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
mux *sync.Mutex
capabilityToCallbacks map[string]*list.List
opampClient client.OpAMPClient
logger *zap.Logger
}

func newCustomCapabilityRegistry(logger *zap.Logger, opampClient client.OpAMPClient) *customCapabilityRegistry {
return &customCapabilityRegistry{
mux: &sync.Mutex{},
capabilityToCallbacks: make(map[string]*list.List),
opampClient: opampClient,
logger: logger,
}
}

// Register registers a new custom capability for the
func (cr *customCapabilityRegistry) Register(capability string, callback CustomMessageCallback) (CustomCapability, error) {
cr.mux.Lock()
defer cr.mux.Unlock()

capabilities := cr.capabilities()
if !slices.Contains(capabilities, capability) {
capabilities = append(capabilities, capability)
}

err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
return nil, fmt.Errorf("set custom capabilities: %w", err)
}

capabilityList := cr.capabilityToCallbacks[capability]
if capabilityList == nil {
capabilityList = list.New()
cr.capabilityToCallbacks[capability] = capabilityList
}

callbackElem := capabilityList.PushBack(callback)

cc := newCustomCapability(cr, cr.opampClient, capability)

// note: We'll have to set the self element in order for the custom capability to remove itself.
cc.selfElement = callbackElem

return cc, nil
}

// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered callbacks for
// the messages capability.
func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) {
cr.mux.Lock()
defer cr.mux.Unlock()

callbacks, ok := cr.capabilityToCallbacks[cm.Capability]
if !ok {
return
}

for node := callbacks.Front(); node != nil; node = node.Next() {
cb, ok := node.Value.(CustomMessageCallback)
if !ok {
continue
}

// Let the callback process asynchronously in a separate goroutine so it can't block
// the opamp extension
go cb(cm)
}
}

// RemoveCustomCapability removes the custom capability with the given callback list element,
// then recalculates and sets the list of custom capabilities on the OpAMP client.
func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, callbackElement *list.Element) {
cr.mux.Lock()
defer cr.mux.Unlock()

callbackList := cr.capabilityToCallbacks[capability]
callbackList.Remove(callbackElement)

if callbackList.Front() == nil {
// Since there are no more callbacks for this capability,
// this capability is no longer supported
delete(cr.capabilityToCallbacks, capability)
}

capabilities := cr.capabilities()
err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
// It's OK if we couldn't actually remove the capability, it just means we won't
// notify the server properly, and the server may send us messages that we have no associated callbacks for.
cr.logger.Error("Failed to set new capabilities", zap.Error(err))
}
}

// capabilities gives the current set of custom capabilities with at least one
// callback registered.
func (cr *customCapabilityRegistry) capabilities() []string {
return maps.Keys(cr.capabilityToCallbacks)
}

type customCapabilityHandler struct {
// unregisteredMux protects unregistered, and makes sure that a message cannot be sent
// on an unregistered capability.
unregisteredMux *sync.Mutex

capability string
selfElement *list.Element
opampClient client.OpAMPClient
registry *customCapabilityRegistry
unregistered bool
}

func newCustomCapability(
registry *customCapabilityRegistry,
opampClient client.OpAMPClient,
capability string,
) *customCapabilityHandler {
return &customCapabilityHandler{
unregisteredMux: &sync.Mutex{},

capability: capability,
opampClient: opampClient,
registry: registry,
}
}

// SendMessage synchronously sends the message
func (c *customCapabilityHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) {
c.unregisteredMux.Lock()
defer c.unregisteredMux.Unlock()

if c.unregistered {
return nil, errors.New("capability has already been unregistered")
}

cm := &protobufs.CustomMessage{
Capability: c.capability,
Type: messageType,
Data: message,
}

return c.opampClient.SendCustomMessage(cm)
}

func (c *customCapabilityHandler) Unregister() {
c.unregisteredMux.Lock()
defer c.unregisteredMux.Unlock()

c.unregistered = true
c.registry.RemoveCustomCapability(c.capability, c.selfElement)
}