Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/opamp]: Custom Message Support #32281

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5182859
custom message interface
BinaryFissionGames Mar 27, 2024
b5fd947
opamp custom messages prototype
BinaryFissionGames Apr 1, 2024
3901617
finish registry + tests
BinaryFissionGames Apr 9, 2024
b83d7c1
ensure extension is a valid registry
BinaryFissionGames Apr 9, 2024
0c1fe09
add license
BinaryFissionGames Apr 9, 2024
8b5f549
add check for unregistered in Unregister
BinaryFissionGames Apr 9, 2024
e204bcf
process custom messages with the registry when received.
BinaryFissionGames Apr 10, 2024
47f1987
add chlog
BinaryFissionGames Apr 10, 2024
e277407
add readme section about custom messages
BinaryFissionGames Apr 10, 2024
5ece376
fix lint errors
BinaryFissionGames Apr 10, 2024
4a2d18a
make goporto
BinaryFissionGames Apr 15, 2024
af558b1
fix incorrect tracking issue
BinaryFissionGames Apr 16, 2024
df759d4
Rework interface based on feedback
BinaryFissionGames Apr 17, 2024
c31315c
callback -> listener
BinaryFissionGames Apr 17, 2024
0aa8fba
Revert "callback -> listener"
BinaryFissionGames Apr 17, 2024
9f7de59
Revert "Rework interface based on feedback"
BinaryFissionGames Apr 17, 2024
bcb17e3
refactor to a loose unregister function
BinaryFissionGames Apr 17, 2024
d506ea7
comment/naming cleanup
BinaryFissionGames Apr 17, 2024
07597d3
Refactor into suggested interface
BinaryFissionGames Apr 23, 2024
448cf87
fix typo in readme example
BinaryFissionGames Apr 23, 2024
1847e41
fix spelling error in comment
BinaryFissionGames Apr 23, 2024
2629cf8
Fix readme indentation
BinaryFissionGames Apr 24, 2024
ff42ce1
Fix comments, unexport withMaxQueuedMessages
BinaryFissionGames Apr 24, 2024
281ee8d
add implementation assertions
BinaryFissionGames Apr 24, 2024
0e3d8b1
rename customeMessageSender -> customMessageHandler
BinaryFissionGames Apr 24, 2024
1f8921d
Check channels are empty in multi capability test
BinaryFissionGames Apr 24, 2024
5166988
newCustomMessageSender -> newCustomMessageHandler
BinaryFissionGames Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
finish registry + tests
  • Loading branch information
BinaryFissionGames committed May 3, 2024
commit 3901617493ce7dbfe6b280f344587ecfd9fc0380
23 changes: 14 additions & 9 deletions extension/opampextension/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,29 @@ import (
"slices"
"sync"

"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/protobufs"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// customCapabilityClient is a subset of OpAMP client containing only the
type customCapabilityClient interface {
SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)
}

type customCapabilityRegistry struct {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
mux *sync.Mutex
capabilityToCallbacks map[string]*list.List
opampClient client.OpAMPClient
client customCapabilityClient
logger *zap.Logger
}

func newCustomCapabilityRegistry(logger *zap.Logger, opampClient client.OpAMPClient) *customCapabilityRegistry {
func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry {
return &customCapabilityRegistry{
mux: &sync.Mutex{},
capabilityToCallbacks: make(map[string]*list.List),
opampClient: opampClient,
client: client,
logger: logger,
}
}
Expand All @@ -39,7 +44,7 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM
capabilities = append(capabilities, capability)
}

err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{
err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
Expand All @@ -54,7 +59,7 @@ func (cr *customCapabilityRegistry) Register(capability string, callback CustomM

callbackElem := capabilityList.PushBack(callback)

cc := newCustomCapability(cr, cr.opampClient, capability)
cc := newCustomCapability(cr, cr.client, capability)

// note: We'll have to set the self element in order for the custom capability to remove itself.
cc.selfElement = callbackElem
Expand Down Expand Up @@ -101,7 +106,7 @@ func (cr *customCapabilityRegistry) RemoveCustomCapability(capability string, ca
}

capabilities := cr.capabilities()
err := cr.opampClient.SetCustomCapabilities(&protobufs.CustomCapabilities{
err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
Expand All @@ -124,14 +129,14 @@ type customCapabilityHandler struct {

capability string
selfElement *list.Element
opampClient client.OpAMPClient
opampClient customCapabilityClient
registry *customCapabilityRegistry
unregistered bool
}

func newCustomCapability(
registry *customCapabilityRegistry,
opampClient client.OpAMPClient,
opampClient customCapabilityClient,
capability string,
) *customCapabilityHandler {
return &customCapabilityHandler{
Expand Down
272 changes: 272 additions & 0 deletions extension/opampextension/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
package opampextension

import (
"errors"
"fmt"
"testing"
"time"

"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestRegistry_Register(t *testing.T) {
t.Run("Registers successfully", func(t *testing.T) {
capabilityString := "io.opentelemetry.teapot"

client := mockCustomCapabilityClient{
setCustomCapabilites: func(customCapabilities *protobufs.CustomCapabilities) error {
require.Equal(t,
&protobufs.CustomCapabilities{
Capabilities: []string{capabilityString},
},
customCapabilities)
return nil
},
}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {})
require.NoError(t, err)
require.NotNil(t, capability)
})

t.Run("Setting capabilities fails", func(t *testing.T) {
capabilityString := "io.opentelemetry.teapot"
capabilityErr := errors.New("network error")

client := mockCustomCapabilityClient{
setCustomCapabilites: func(customCapabilities *protobufs.CustomCapabilities) error {
return capabilityErr
},
}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

capability, err := registry.Register(capabilityString, func(*protobufs.CustomMessage) {})
require.Nil(t, capability)
require.ErrorIs(t, err, capabilityErr)
require.Len(t, registry.capabilityToCallbacks, 0, "Setting capability failed, but callback ended up in the map anyways")
})
}

func TestRegistry_ProcessMessage(t *testing.T) {
t.Run("Calls registered callback", func(t *testing.T) {
capabilityString := "io.opentelemetry.teapot"
messageType := "steep"
mesageBytes := []byte("blackTea")
customMessage := &protobufs.CustomMessage{
Capability: capabilityString,
Type: messageType,
Data: mesageBytes,
}

client := mockCustomCapabilityClient{}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

callbackCalledChan := make(chan struct{})
capability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) {
require.Equal(t, customMessage, c)

close(callbackCalledChan)
})
require.NotNil(t, capability)
require.NoError(t, err)

registry.ProcessMessage(customMessage)
select {
case <-time.After(2 * time.Second):
t.Fatalf("Timed out waiting for callback to be called")
case <-callbackCalledChan: // OK
}

})

t.Run("Callback is called only for its own capability", func(t *testing.T) {
teapotCapabilityString1 := "io.opentelemetry.teapot"
coffeeMakerCapabilityString2 := "io.opentelemetry.coffeeMaker"

messageType1 := "steep"
messageBytes1 := []byte("blackTea")

messageType2 := "brew"
messageBytes2 := []byte("blackCoffee")

customMessageSteep := &protobufs.CustomMessage{
Capability: teapotCapabilityString1,
Type: messageType1,
Data: messageBytes1,
}

customMessageBrew := &protobufs.CustomMessage{
Capability: coffeeMakerCapabilityString2,
Type: messageType2,
Data: messageBytes2,
}

client := mockCustomCapabilityClient{}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

teapotCalledChan := make(chan struct{})
capabilityTeapot, err := registry.Register(teapotCapabilityString1, func(c *protobufs.CustomMessage) {
require.Equal(t, customMessageSteep, c)

close(teapotCalledChan)
})
require.NotNil(t, capabilityTeapot)
require.NoError(t, err)

coffeeMakerCalledChan := make(chan struct{})
capabilityCoffeeMaker, err := registry.Register(coffeeMakerCapabilityString2, func(c *protobufs.CustomMessage) {
require.Equal(t, customMessageBrew, c)

close(coffeeMakerCalledChan)
})
require.NotNil(t, capabilityCoffeeMaker)
require.NoError(t, err)

registry.ProcessMessage(customMessageSteep)
registry.ProcessMessage(customMessageBrew)

select {
case <-time.After(2 * time.Second):
t.Fatalf("Timed out waiting for callback 1 to be called")
case <-coffeeMakerCalledChan: // OK
}
select {
case <-time.After(2 * time.Second):
t.Fatalf("Timed out waiting for callback 2 to be called")
case <-coffeeMakerCalledChan: // OK
}
})
}

func TestCustomCapability_SendMesage(t *testing.T) {
t.Run("Sends message", func(t *testing.T) {
capabilityString := "io.opentelemetry.teapot"
messageType := "brew"
mesageBytes := []byte("black")

client := mockCustomCapabilityClient{
sendCustomMessage: func(message *protobufs.CustomMessage) (chan struct{}, error) {
require.Equal(t, &protobufs.CustomMessage{
Capability: capabilityString,
Type: messageType,
Data: mesageBytes,
}, message)
return nil, nil
},
}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

capability, err := registry.Register(capabilityString, func(_ *protobufs.CustomMessage) {})
require.NoError(t, err)
require.NotNil(t, capability)

channel, err := capability.SendMessage(messageType, mesageBytes)
require.NoError(t, err)
require.Nil(t, channel, nil)
})
}

func TestCustomCapability_Unregister(t *testing.T) {
t.Run("Unregistered capability callback is no longer called", func(t *testing.T) {
capabilityString := "io.opentelemetry.teapot"
messageType := "steep"
mesageBytes := []byte("blackTea")
customMessage := &protobufs.CustomMessage{
Capability: capabilityString,
Type: messageType,
Data: mesageBytes,
}

client := mockCustomCapabilityClient{}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) {
t.Fatalf("Unregistered capability should not be called")
})
require.NotNil(t, unregisteredCapability)
require.NoError(t, err)

unregisteredCapability.Unregister()

registry.ProcessMessage(customMessage)
})

t.Run("Unregister is successful even if set capabilities fails", func(t *testing.T) {
capabilityString := "io.opentelemetry.teapot"
messageType := "steep"
mesageBytes := []byte("blackTea")
customMessage := &protobufs.CustomMessage{
Capability: capabilityString,
Type: messageType,
Data: mesageBytes,
}

client := &mockCustomCapabilityClient{}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) {
t.Fatalf("Unregistered capability should not be called")
})
require.NotNil(t, unregisteredCapability)
require.NoError(t, err)

client.setCustomCapabilites = func(customCapabilities *protobufs.CustomCapabilities) error {
return fmt.Errorf("failed to set capabilities")
}

unregisteredCapability.Unregister()

registry.ProcessMessage(customMessage)
})

t.Run("Does not send if unregistered", func(t *testing.T) {
capabilityString := "io.opentelemetry.teapot"
messageType := "steep"
mesageBytes := []byte("blackTea")

client := mockCustomCapabilityClient{}

registry := newCustomCapabilityRegistry(zap.NewNop(), client)

unregisteredCapability, err := registry.Register(capabilityString, func(c *protobufs.CustomMessage) {
t.Fatalf("Unregistered capability should not be called")
})
require.NotNil(t, unregisteredCapability)
require.NoError(t, err)

unregisteredCapability.Unregister()

_, err = unregisteredCapability.SendMessage(messageType, mesageBytes)
require.ErrorContains(t, err, "capability has already been unregistered")
})
}

type mockCustomCapabilityClient struct {
sendCustomMessage func(message *protobufs.CustomMessage) (chan struct{}, error)
setCustomCapabilites func(customCapabilities *protobufs.CustomCapabilities) error
}

func (m mockCustomCapabilityClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error {
if m.setCustomCapabilites != nil {
return m.setCustomCapabilites(customCapabilities)
}
return nil
}

func (m mockCustomCapabilityClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) {
if m.sendCustomMessage != nil {
return m.sendCustomMessage(message)
}

return make(chan struct{}), nil
}