Skip to content

Commit

Permalink
[cmd/opampsupervisor]: Allow configuring agent description (open-tele…
Browse files Browse the repository at this point in the history
…metry#32819)

**Description:** <Describe what has changed.>
* Add options to the configuration that allows configuring the agent
description

**Link to tracking Issue:** Closes open-telemetry#32824

**Testing:** e2e test, unit tests for validation

**Documentation:**
* Updated spec with configuration parameters

---------

Co-authored-by: Tigran Najaryan <[email protected]>
  • Loading branch information
BinaryFissionGames and tigrannajaryan committed May 15, 2024
1 parent 78bd957 commit 4d5fbfb
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 3 deletions.
13 changes: 13 additions & 0 deletions .chloggen/feat_supervisor-agent-description.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds the ability to configure the agent description

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32824]
91 changes: 91 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,97 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
}, 5*time.Second, 250*time.Millisecond)
}

func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) {
// Load the Supervisor config so we can get the location of
// the Collector that will be run.
var cfg config.Supervisor
cfgFile := getSupervisorConfig(t, "agent_description", map[string]string{})
k := koanf.New("::")
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
require.NoError(t, err)
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
Tag: "mapstructure",
})
require.NoError(t, err)

host, err := os.Hostname()
require.NoError(t, err)

// Get the binary name and version from the Collector binary
// using the `components` command that prints a YAML-encoded
// map of information about the Collector build. Some of this
// information will be used as defaults for the telemetry
// attributes.
agentPath := cfg.Agent.Executable
componentsInfo, err := exec.Command(agentPath, "components").Output()
require.NoError(t, err)
k = koanf.New("::")
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
require.NoError(t, err)
buildinfo := k.StringMap("buildinfo")
command := buildinfo["command"]
version := buildinfo["version"]

agentDescMessageChan := make(chan *protobufs.AgentToServer, 1)

server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
select {
case agentDescMessageChan <- message:
default:
}
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "agent_description", map[string]string{"url": server.addr})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)
var ad *protobufs.AgentToServer
select {
case ad = <-agentDescMessageChan:
case <-time.After(5 * time.Second):
t.Fatal("Failed to get agent description after 5 seconds")
}

expectedDescription := &protobufs.AgentDescription{
IdentifyingAttributes: []*protobufs.KeyValue{
stringKeyValue("client.id", "my-client-id"),
stringKeyValue(semconv.AttributeServiceInstanceID, ad.InstanceUid),
stringKeyValue(semconv.AttributeServiceName, command),
stringKeyValue(semconv.AttributeServiceVersion, version),
},
NonIdentifyingAttributes: []*protobufs.KeyValue{
stringKeyValue("env", "prod"),
stringKeyValue(semconv.AttributeHostArch, runtime.GOARCH),
stringKeyValue(semconv.AttributeHostName, host),
stringKeyValue(semconv.AttributeOSType, runtime.GOOS),
},
}

require.Equal(t, expectedDescription, ad.AgentDescription)

time.Sleep(250 * time.Millisecond)
}

func stringKeyValue(key, val string) *protobufs.KeyValue {
return &protobufs.KeyValue{
Key: key,
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: val,
},
},
}
}

// Creates a Collector config that reads and writes logs to files and provides
// file descriptors for I/O operations to those files. The files are placed
// in a unique temp directory that is cleaned up after the test's completion.
Expand Down
13 changes: 12 additions & 1 deletion cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ storage:
# and %ProgramData%/Otelcol/Supervisor on Windows.
directory: /path/to/dir

collector:
agent:
# Path to Collector executable. Required.
executable: /opt/otelcol/bin/otelcol

Expand All @@ -144,6 +144,17 @@ collector:
deny: \[/var/log/secret_logs\]
write:
allow: \[/var/otelcol\]

# Optional key-value pairs to add to either the identifying attributes or
# non-identifying attributes of the agent description sent to the OpAMP server.
# Values here override the values in the agent description retrieved from the collector's
# OpAMP extension (self-reported by the Collector).
description:
identifying_attributes:
client.id: "01HWWSK84BMT7J45663MBJMTPJ"
non_identifying_attributes:
custom.attribute: "custom-value"

```
### Executing Collector
Expand Down
8 changes: 7 additions & 1 deletion cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,11 @@ type OpAMPServer struct {
}

type Agent struct {
Executable string
Executable string
Description AgentDescription `mapstructure:"description"`
}

type AgentDescription struct {
IdentifyingAttributes map[string]string `mapstructure:"identifying_attributes"`
NonIdentifyingAttributes map[string]string `mapstructure:"non_identifying_attributes"`
}
47 changes: 46 additions & 1 deletion cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
onMessageFunc: func(_ serverTypes.Connection, message *protobufs.AgentToServer) {
if message.AgentDescription != nil {
instanceIDSeen := false
s.agentDescription = message.AgentDescription
s.setAgentDescription(message.AgentDescription)
identAttr := s.agentDescription.IdentifyingAttributes

for _, attr := range identAttr {
Expand Down Expand Up @@ -431,6 +431,51 @@ func (s *Supervisor) startOpAMP() error {
return nil
}

// setAgentDescription sets the agent description, merging in any user-specified attributes from the supervisor configuration.
func (s *Supervisor) setAgentDescription(ad *protobufs.AgentDescription) {
ad.IdentifyingAttributes = applyKeyValueOverrides(s.config.Agent.Description.IdentifyingAttributes, ad.IdentifyingAttributes)
ad.NonIdentifyingAttributes = applyKeyValueOverrides(s.config.Agent.Description.NonIdentifyingAttributes, ad.NonIdentifyingAttributes)
s.agentDescription = ad
}

// applyKeyValueOverrides merges the overrides map into the array of key value pairs.
// If a key from overrides already exists in the array of key value pairs, it is overwritten by the value from the overrides map.
// An array of KeyValue pair is returned, with each key value pair having a distinct key.
func applyKeyValueOverrides(overrides map[string]string, orig []*protobufs.KeyValue) []*protobufs.KeyValue {
kvMap := make(map[string]*protobufs.KeyValue, len(orig)+len(overrides))

for _, kv := range orig {
kvMap[kv.Key] = kv
}

for k, v := range overrides {
kvMap[k] = &protobufs.KeyValue{
Key: k,
Value: &protobufs.AnyValue{
Value: &protobufs.AnyValue_StringValue{
StringValue: v,
},
},
}
}

// Sort keys for stable output, makes it easier to test.
keys := make([]string, 0, len(kvMap))
for k := range kvMap {
keys = append(keys, k)
}

sort.Strings(keys)

kvOut := make([]*protobufs.KeyValue, 0, len(kvMap))
for _, k := range keys {
v := kvMap[k]
kvOut = append(kvOut, v)
}

return kvOut
}

func (s *Supervisor) stopOpAMP() error {
s.logger.Debug("Stopping OpAMP client...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
server:
endpoint: ws:https://{{.url}}/v1/opamp
tls:
insecure: true

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_restart_command: true

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
description:
identifying_attributes:
client.id: "my-client-id"
non_identifying_attributes:
env: "prod"

0 comments on commit 4d5fbfb

Please sign in to comment.