Skip to content

Commit

Permalink
Add path prefix for otlp http receiver (#7570)
Browse files Browse the repository at this point in the history
Supports multiple otlp http receivers running behind a reverse
proxy/ingress.

Fixes #7511 

Updated existing tests to account for new parameter.
  • Loading branch information
fredthomsen committed Jul 21, 2023
1 parent a18c67d commit 5f11443
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 56 deletions.
16 changes: 16 additions & 0 deletions .chloggen/otlp-receiver-add-http-signal-url-paths.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add http url paths per signal config options to otlpreceiver

# One or more tracking issues or pull requests related to the change
issues: [7511]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
12 changes: 9 additions & 3 deletions receiver/otlpreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,15 @@ The OTLP receiver can receive trace export calls via HTTP/JSON in addition to
gRPC. The HTTP/JSON address is the same as gRPC as the protocol is recognized
and processed accordingly. Note the serialization format needs to be [protobuf JSON](https://developers.google.com/protocol-buffers/docs/proto3#json).

To write traces with HTTP/JSON, `POST` to `[address]/v1/traces` for traces,
to `[address]/v1/metrics` for metrics, to `[address]/v1/logs` for logs. The default
port is `4318`.
The HTTP/JSON configuration also provides `traces_url_path`, `metrics_url_path`, and `logs_url_path`
configuration to allow the URL paths that signal data needs to be sent to be modified per signal type. These default to
`/v1/traces`, `/v1/metrics`, and `/v1/logs` respectively.

To write traces with HTTP/JSON, `POST` to `[address]/[traces_url_path]` for traces,
to `[address]/[metrics_url_path]` for metrics, to `[address]/[logs_url_path]` for logs.
The default port is `4318`. When using the `otlphttpexporter` peer to communicate with this component,
use the `traces_endpoint`, `metrics_endpoint`, and `logs_endpoint` settings in the `otlphttpexporter` to set the
proper URL to match the address and URL signal path on the `otlpreceiver`.

### CORS (Cross-origin resource sharing)

Expand Down
43 changes: 42 additions & 1 deletion receiver/otlpreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package otlpreceiver // import "go.opentelemetry.io/collector/receiver/otlprecei

import (
"errors"
"fmt"
"net/url"
"path"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -18,10 +21,23 @@ const (
protoHTTP = "protocols::http"
)

type httpServerSettings struct {
*confighttp.HTTPServerSettings `mapstructure:",squash"`

// The URL path to receive traces on. If omitted "/v1/traces" will be used.
TracesURLPath string `mapstructure:"traces_url_path,omitempty"`

// The URL path to receive metrics on. If omitted "/v1/metrics" will be used.
MetricsURLPath string `mapstructure:"metrics_url_path,omitempty"`

// The URL path to receive logs on. If omitted "/v1/logs" will be used.
LogsURLPath string `mapstructure:"logs_url_path,omitempty"`
}

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *confighttp.HTTPServerSettings `mapstructure:"http"`
HTTP *httpServerSettings `mapstructure:"http"`
}

// Config defines configuration for OTLP receiver.
Expand Down Expand Up @@ -55,7 +71,32 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error {

if !conf.IsSet(protoHTTP) {
cfg.HTTP = nil
} else {
var err error

if cfg.HTTP.TracesURLPath, err = sanitizeURLPath(cfg.HTTP.TracesURLPath); err != nil {
return err
}
if cfg.HTTP.MetricsURLPath, err = sanitizeURLPath(cfg.HTTP.MetricsURLPath); err != nil {
return err
}
if cfg.HTTP.LogsURLPath, err = sanitizeURLPath(cfg.HTTP.LogsURLPath); err != nil {
return err
}
}

return nil
}

// Verify signal URL path sanity
func sanitizeURLPath(urlPath string) (string, error) {
u, err := url.Parse(urlPath)
if err != nil {
return "", fmt.Errorf("invalid HTTP URL path set for signal: %w", err)
}

if !path.IsAbs(u.Path) {
u.Path = "/" + u.Path
}
return u.Path, nil
}
65 changes: 52 additions & 13 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,23 @@ func TestUnmarshalConfig(t *testing.T) {
},
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:4318",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:4318",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
},
CORS: &confighttp.CORSSettings{
AllowedOrigins: []string{"https://*.test.com", "https://test.com"},
MaxAge: 7200,
},
},
CORS: &confighttp.CORSSettings{
AllowedOrigins: []string{"https://*.test.com", "https://test.com"},
MaxAge: 7200,
},
TracesURLPath: "/traces",
MetricsURLPath: "/v2/metrics",
LogsURLPath: "/log/ingest",
},
},
}, cfg)
Expand All @@ -149,9 +154,13 @@ func TestUnmarshalConfigUnix(t *testing.T) {
},
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
// Transport: "unix",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
},
TracesURLPath: defaultTracesURLPath,
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
},
},
}, cfg)
Expand Down Expand Up @@ -182,6 +191,36 @@ func TestUnmarshalConfigEmptyProtocols(t *testing.T) {
assert.EqualError(t, component.ValidateConfig(cfg), "must specify at least one protocol when using the OTLP receiver")
}

func TestUnmarshalConfigInvalidSignalPath(t *testing.T) {
tests := []struct {
name string
testDataFn string
}{
{
name: "Invalid traces URL path",
testDataFn: "invalid_traces_path.yaml",
},
{
name: "Invalid metrics URL path",
testDataFn: "invalid_metrics_path.yaml",
},
{
name: "Invalid logs URL path",
testDataFn: "invalid_logs_path.yaml",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", test.testDataFn))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.EqualError(t, component.UnmarshalConfig(cm, cfg), "invalid HTTP URL path set for signal: parse \":invalid\": missing protocol scheme")
})
}
}

func TestUnmarshalConfigEmpty(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
Expand Down
13 changes: 11 additions & 2 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (

defaultGRPCEndpoint = "0.0.0.0:4317"
defaultHTTPEndpoint = "0.0.0.0:4318"

defaultTracesURLPath = "/v1/traces"
defaultMetricsURLPath = "/v1/metrics"
defaultLogsURLPath = "/v1/logs"
)

// NewFactory creates a new OTLP receiver factory.
Expand All @@ -44,8 +48,13 @@ func createDefaultConfig() component.Config {
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPEndpoint,
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPEndpoint,
},
TracesURLPath: defaultTracesURLPath,
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
},
},
}
Expand Down
54 changes: 40 additions & 14 deletions receiver/otlpreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ func TestCreateTracesReceiver(t *testing.T) {
Transport: "tcp",
},
}
defaultHTTPSettings := &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
defaultHTTPSettings := &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
TracesURLPath: defaultTracesURLPath,
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
}

tests := []struct {
Expand Down Expand Up @@ -89,8 +94,11 @@ func TestCreateTracesReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "localhost:112233",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "localhost:112233",
},
TracesURLPath: defaultTracesURLPath,
},
},
},
Expand Down Expand Up @@ -124,8 +132,13 @@ func TestCreateMetricReceiver(t *testing.T) {
Transport: "tcp",
},
}
defaultHTTPSettings := &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
defaultHTTPSettings := &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
TracesURLPath: defaultTracesURLPath,
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
}

tests := []struct {
Expand Down Expand Up @@ -162,8 +175,11 @@ func TestCreateMetricReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
},
MetricsURLPath: defaultMetricsURLPath,
},
},
},
Expand Down Expand Up @@ -196,8 +212,13 @@ func TestCreateLogReceiver(t *testing.T) {
Transport: "tcp",
},
}
defaultHTTPSettings := &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
defaultHTTPSettings := &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
TracesURLPath: defaultTracesURLPath,
MetricsURLPath: defaultMetricsURLPath,
LogsURLPath: defaultLogsURLPath,
}

tests := []struct {
Expand Down Expand Up @@ -238,8 +259,11 @@ func TestCreateLogReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
},
LogsURLPath: defaultLogsURLPath,
},
},
},
Expand All @@ -251,8 +275,10 @@ func TestCreateLogReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "127.0.0.1:1122",
},
},
},
},
Expand Down
8 changes: 4 additions & 4 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
return err
}

err = r.startHTTPServer(r.cfg.HTTP, host)
err = r.startHTTPServer(r.cfg.HTTP.HTTPServerSettings, host)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error {
r.tracesReceiver = trace.New(tc, r.obsrepGRPC)
httpTracesReceiver := trace.New(tc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) {
r.httpMux.HandleFunc(r.cfg.HTTP.TracesURLPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand All @@ -214,7 +214,7 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
r.metricsReceiver = metrics.New(mc, r.obsrepGRPC)
httpMetricsReceiver := metrics.New(mc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) {
r.httpMux.HandleFunc(r.cfg.HTTP.MetricsURLPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand All @@ -239,7 +239,7 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error {
r.logsReceiver = logs.New(lc, r.obsrepGRPC)
httpLogsReceiver := logs.New(lc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/logs", func(resp http.ResponseWriter, req *http.Request) {
r.httpMux.HandleFunc(r.cfg.HTTP.LogsURLPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand Down
Loading

0 comments on commit 5f11443

Please sign in to comment.