Skip to content

Commit

Permalink
Add HTTP Client to opampextension (open-telemetry#31392)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Ziqi Zhao <[email protected]>
  • Loading branch information
jaronoff97 and fatsheep9146 committed Feb 28, 2024
1 parent bd79775 commit bcf64e6
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 35 deletions.
27 changes: 27 additions & 0 deletions .chloggen/opamp-http.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: enables creating and using an http client

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31389]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
79 changes: 71 additions & 8 deletions extension/opampextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec

import (
"errors"
"net/url"

"github.com/oklog/ulid/v2"
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/protobufs"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
)

// Config contains the configuration for the opamp extension. Trying to mirror
Expand Down Expand Up @@ -41,22 +44,82 @@ func (caps Capabilities) toAgentCapabilities() protobufs.AgentCapabilities {
return agentCapabilities
}

type commonFields struct {
Endpoint string `mapstructure:"endpoint"`
TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"`
Headers map[string]configopaque.String `mapstructure:"headers,omitempty"`
}

// OpAMPServer contains the OpAMP transport configuration.
type OpAMPServer struct {
WS *OpAMPWebsocket `mapstructure:"ws"`
WS *commonFields `mapstructure:"ws,omitempty"`
HTTP *commonFields `mapstructure:"http,omitempty"`
}

// OpAMPWebsocket contains the OpAMP websocket transport configuration.
type OpAMPWebsocket struct {
Endpoint string `mapstructure:"endpoint"`
TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"`
Headers map[string]configopaque.String `mapstructure:"headers,omitempty"`
func (c *commonFields) Scheme() string {
uri, err := url.ParseRequestURI(c.Endpoint)
if err != nil {
return ""
}
return uri.Scheme
}

func (c *commonFields) Validate() error {
if c.Endpoint == "" {
return errors.New("opamp server endpoint must be provided")
}
return nil
}

func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient {
if s.WS != nil {
return client.NewWebSocket(newLoggerFromZap(logger.With(zap.String("client", "ws"))))
}
return client.NewHTTP(newLoggerFromZap(logger.With(zap.String("client", "http"))))
}

func (s OpAMPServer) GetHeaders() map[string]configopaque.String {
if s.WS != nil {
return s.WS.Headers
} else if s.HTTP != nil {
return s.HTTP.Headers
}
return map[string]configopaque.String{}
}

func (s OpAMPServer) GetTLSSetting() configtls.TLSClientSetting {
if s.WS != nil {
return s.WS.TLSSetting
} else if s.HTTP != nil {
return s.HTTP.TLSSetting
}
return configtls.TLSClientSetting{}
}

func (s OpAMPServer) GetEndpoint() string {
if s.WS != nil {
return s.WS.Endpoint
} else if s.HTTP != nil {
return s.HTTP.Endpoint
}
return ""
}

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
if cfg.Server.WS.Endpoint == "" {
return errors.New("opamp server websocket endpoint must be provided")
switch {
case cfg.Server.WS == nil && cfg.Server.HTTP == nil:
return errors.New("opamp server must have at least ws or http set")
case cfg.Server.WS != nil && cfg.Server.HTTP != nil:
return errors.New("opamp server must have only ws or http set")
case cfg.Server.WS != nil:
if err := cfg.Server.WS.Validate(); err != nil {
return err
}
case cfg.Server.HTTP != nil:
if err := cfg.Server.HTTP.Validate(); err != nil {
return err
}
}

if cfg.InstanceUID != "" {
Expand Down
230 changes: 215 additions & 15 deletions extension/opampextension/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
)
Expand All @@ -30,7 +32,7 @@ func TestUnmarshalConfig(t *testing.T) {
assert.Equal(t,
&Config{
Server: &OpAMPServer{
WS: &OpAMPWebsocket{
WS: &commonFields{
Endpoint: "wss:https://127.0.0.1:4320/v1/opamp",
},
},
Expand All @@ -41,20 +43,218 @@ func TestUnmarshalConfig(t *testing.T) {
}, cfg)
}

func TestConfigValidate(t *testing.T) {
cfg := &Config{
Server: &OpAMPServer{
WS: &OpAMPWebsocket{},
func TestUnmarshalHttpConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_http.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
assert.Equal(t,
&Config{
Server: &OpAMPServer{
HTTP: &commonFields{
Endpoint: "https://127.0.0.1:4320/v1/opamp",
},
},
InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ",
Capabilities: Capabilities{
ReportsEffectiveConfig: true,
},
}, cfg)
}

func TestConfig_Getters(t *testing.T) {
type fields struct {
Server *OpAMPServer
}
type expected struct {
headers assert.ValueAssertionFunc
tls assert.ValueAssertionFunc
endpoint assert.ValueAssertionFunc
}
tests := []struct {
name string
fields fields
expected expected
}{
{
name: "nothing set",
fields: fields{
Server: &OpAMPServer{},
},
expected: expected{
headers: assert.Empty,
tls: assert.Empty,
endpoint: assert.Empty,
},
},
{
name: "WS valid endpoint, headers, tls",
fields: fields{
Server: &OpAMPServer{
WS: &commonFields{
Endpoint: "wss:https://127.0.0.1:4320/v1/opamp",
Headers: map[string]configopaque.String{
"test": configopaque.String("test"),
},
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
},
},
},
expected: expected{
headers: assert.NotEmpty,
tls: assert.NotEmpty,
endpoint: assert.NotEmpty,
},
},
{
name: "HTTP valid endpoint and valid instance id",
fields: fields{
Server: &OpAMPServer{
HTTP: &commonFields{
Endpoint: "https://127.0.0.1:4320/v1/opamp",
Headers: map[string]configopaque.String{
"test": configopaque.String("test"),
},
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
},
},
},
expected: expected{
headers: assert.NotEmpty,
tls: assert.NotEmpty,
endpoint: assert.NotEmpty,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.expected.headers(t, tt.fields.Server.GetHeaders())
tt.expected.tls(t, tt.fields.Server.GetTLSSetting())
tt.expected.endpoint(t, tt.fields.Server.GetEndpoint())
})
}
}

func TestConfig_Validate(t *testing.T) {
type fields struct {
Server *OpAMPServer
InstanceUID string
Capabilities Capabilities
}
tests := []struct {
name string
fields fields
wantErr assert.ErrorAssertionFunc
}{
{
name: "WS must have endpoint",
fields: fields{
Server: &OpAMPServer{
WS: &commonFields{},
},
},
wantErr: func(t assert.TestingT, err error, _ ...any) bool {
return assert.Equal(t, "opamp server endpoint must be provided", err.Error())
},
},
{
name: "WS valid endpoint and invalid instance id",
fields: fields{
Server: &OpAMPServer{
WS: &commonFields{
Endpoint: "wss:https://127.0.0.1:4320/v1/opamp",
},
},
InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL",
},
wantErr: func(t assert.TestingT, err error, _ ...any) bool {
return assert.Equal(t, "opamp instance_uid is invalid", err.Error())
},
},
{
name: "WS valid endpoint and valid instance id",
fields: fields{
Server: &OpAMPServer{
WS: &commonFields{
Endpoint: "wss:https://127.0.0.1:4320/v1/opamp",
},
},
InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ",
},
wantErr: assert.NoError,
},
{
name: "HTTP must have endpoint",
fields: fields{
Server: &OpAMPServer{
HTTP: &commonFields{},
},
},
wantErr: func(t assert.TestingT, err error, _ ...any) bool {
return assert.Equal(t, "opamp server endpoint must be provided", err.Error())
},
},
{
name: "HTTP valid endpoint and invalid instance id",
fields: fields{
Server: &OpAMPServer{
HTTP: &commonFields{
Endpoint: "https://127.0.0.1:4320/v1/opamp",
},
},
InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL",
},
wantErr: func(t assert.TestingT, err error, _ ...any) bool {
return assert.Equal(t, "opamp instance_uid is invalid", err.Error())
},
},
{
name: "HTTP valid endpoint and valid instance id",
fields: fields{
Server: &OpAMPServer{
HTTP: &commonFields{
Endpoint: "https://127.0.0.1:4320/v1/opamp",
},
},
InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ",
},
wantErr: assert.NoError,
},
{
name: "neither config set",
fields: fields{
Server: &OpAMPServer{},
},
wantErr: func(t assert.TestingT, err error, _ ...any) bool {
return assert.Equal(t, "opamp server must have at least ws or http set", err.Error())
},
},
{
name: "both config set",
fields: fields{
Server: &OpAMPServer{
WS: &commonFields{},
HTTP: &commonFields{},
},
},
wantErr: func(t assert.TestingT, err error, _ ...any) bool {
return assert.Equal(t, "opamp server must have only ws or http set", err.Error())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{
Server: tt.fields.Server,
InstanceUID: tt.fields.InstanceUID,
Capabilities: tt.fields.Capabilities,
}
tt.wantErr(t, cfg.Validate())
})
}
err := cfg.Validate()
assert.Equal(t, "opamp server websocket endpoint must be provided", err.Error())
cfg.Server.WS.Endpoint = "wss:https://127.0.0.1:4320/v1/opamp"
assert.NoError(t, cfg.Validate())
cfg.InstanceUID = "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL"
err = cfg.Validate()
require.Error(t, err)
assert.Equal(t, "opamp instance_uid is invalid", err.Error())
cfg.InstanceUID = "01BX5ZZKBKACTAV9WEVGEMMVRZ"
require.NoError(t, cfg.Validate())
}
4 changes: 1 addition & 3 deletions extension/opampextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ func NewFactory() extension.Factory {

func createDefaultConfig() component.Config {
return &Config{
Server: &OpAMPServer{
WS: &OpAMPWebsocket{},
},
Server: &OpAMPServer{},
Capabilities: Capabilities{
ReportsEffectiveConfig: true,
},
Expand Down
Loading

0 comments on commit bcf64e6

Please sign in to comment.