forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 5
/
config.go
152 lines (127 loc) · 4.61 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension"
import (
"errors"
"net/url"
"time"
"github.com/oklog/ulid/v2"
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/protobufs"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
)
// Config contains the configuration for the opamp extension. Trying to mirror
// the OpAMP supervisor config for some consistency.
type Config struct {
Server *OpAMPServer `mapstructure:"server"`
// InstanceUID is a ULID formatted as a 26 character string in canonical
// representation. Auto-generated on start if missing.
InstanceUID string `mapstructure:"instance_uid"`
// Capabilities contains options to enable a particular OpAMP capability
Capabilities Capabilities `mapstructure:"capabilities"`
// Agent descriptions contains options to modify the AgentDescription message
AgentDescription AgentDescription `mapstructure:"agent_description"`
// PPID is the process ID of the parent for the collector. If the PPID is specified,
// the extension will continuously poll for the status of the parent process, and emit a fatal error
// when the parent process is no longer running.
// If unspecified, the orphan detection logic does not run.
PPID int32 `mapstructure:"ppid"`
// PPIDPollInterval is the time between polling for whether PPID is running.
PPIDPollInterval time.Duration `mapstructure:"ppid_poll_interval"`
}
type AgentDescription struct {
// NonIdentifyingAttributes are a map of key-value pairs that may be specified to provide
// extra information about the agent to the OpAMP server.
NonIdentifyingAttributes map[string]string `mapstructure:"non_identifying_attributes"`
}
type Capabilities struct {
// ReportsEffectiveConfig enables the OpAMP ReportsEffectiveConfig Capability. (default: true)
ReportsEffectiveConfig bool `mapstructure:"reports_effective_config"`
}
func (caps Capabilities) toAgentCapabilities() protobufs.AgentCapabilities {
// All Agents MUST report status.
agentCapabilities := protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus
if caps.ReportsEffectiveConfig {
agentCapabilities |= protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig
}
return agentCapabilities
}
type commonFields struct {
Endpoint string `mapstructure:"endpoint"`
TLSSetting configtls.ClientConfig `mapstructure:"tls,omitempty"`
Headers map[string]configopaque.String `mapstructure:"headers,omitempty"`
}
// OpAMPServer contains the OpAMP transport configuration.
type OpAMPServer struct {
WS *commonFields `mapstructure:"ws,omitempty"`
HTTP *commonFields `mapstructure:"http,omitempty"`
}
func (c *commonFields) Scheme() string {
uri, err := url.ParseRequestURI(c.Endpoint)
if err != nil {
return ""
}
return uri.Scheme
}
func (c *commonFields) Validate() error {
if c.Endpoint == "" {
return errors.New("opamp server endpoint must be provided")
}
return nil
}
func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient {
if s.WS != nil {
return client.NewWebSocket(newLoggerFromZap(logger.With(zap.String("client", "ws"))))
}
return client.NewHTTP(newLoggerFromZap(logger.With(zap.String("client", "http"))))
}
func (s OpAMPServer) GetHeaders() map[string]configopaque.String {
if s.WS != nil {
return s.WS.Headers
} else if s.HTTP != nil {
return s.HTTP.Headers
}
return map[string]configopaque.String{}
}
func (s OpAMPServer) GetTLSSetting() configtls.ClientConfig {
if s.WS != nil {
return s.WS.TLSSetting
} else if s.HTTP != nil {
return s.HTTP.TLSSetting
}
return configtls.ClientConfig{}
}
func (s OpAMPServer) GetEndpoint() string {
if s.WS != nil {
return s.WS.Endpoint
} else if s.HTTP != nil {
return s.HTTP.Endpoint
}
return ""
}
// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
switch {
case cfg.Server.WS == nil && cfg.Server.HTTP == nil:
return errors.New("opamp server must have at least ws or http set")
case cfg.Server.WS != nil && cfg.Server.HTTP != nil:
return errors.New("opamp server must have only ws or http set")
case cfg.Server.WS != nil:
if err := cfg.Server.WS.Validate(); err != nil {
return err
}
case cfg.Server.HTTP != nil:
if err := cfg.Server.HTTP.Validate(); err != nil {
return err
}
}
if cfg.InstanceUID != "" {
_, err := ulid.ParseStrict(cfg.InstanceUID)
if err != nil {
return errors.New("opamp instance_uid is invalid")
}
}
return nil
}