diff --git a/receiver/sapmreceiver/README.md b/receiver/sapmreceiver/README.md index 3d6caaac3323c..17a22387f484d 100644 --- a/receiver/sapmreceiver/README.md +++ b/receiver/sapmreceiver/README.md @@ -12,6 +12,13 @@ Example: receivers: sapm: endpoint: localhost:7276 + tls: + cert_file: /test.crt + key_file: /test.key ``` * `endpoint`: Address and port that the SAPM receiver should bind to. Note that this must be 0.0.0.0: instead of localhost if you want to receive spans from sources exporting to IPs other than localhost on the same host. For example, when the collector is deployed as a k8s deployment and exposed using a service. +* `tls`: This is an optional object used to specify if TLS should be used for incoming connections. + * `cert_file`: Specifies the certificate file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection. + * `key_file`: Specifies the key file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection. + \ No newline at end of file diff --git a/receiver/sapmreceiver/config.go b/receiver/sapmreceiver/config.go index bd87f463d1b0b..d49332a654f4a 100644 --- a/receiver/sapmreceiver/config.go +++ b/receiver/sapmreceiver/config.go @@ -16,9 +16,14 @@ package sapmreceiver import ( "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" ) // Config defines configuration for SAPM receiver. type Config struct { - configmodels.ReceiverSettings `mapstructure:",squash"` + configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + + // Configures the receiver to use TLS. + // The default value is nil, which will cause the receiver to not use TLS. + TLSCredentials *configtls.TLSSetting `mapstructure:"tls, omitempty"` } diff --git a/receiver/sapmreceiver/config_test.go b/receiver/sapmreceiver/config_test.go index 7c62a37a0fc06..79ed9a8fdfbcb 100644 --- a/receiver/sapmreceiver/config_test.go +++ b/receiver/sapmreceiver/config_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" ) func TestLoadConfig(t *testing.T) { @@ -37,7 +38,7 @@ func TestLoadConfig(t *testing.T) { // The receiver `sapm/disabled` doesn't count because disabled receivers // are excluded from the final list. - assert.Equal(t, len(cfg.Receivers), 2) + assert.Equal(t, len(cfg.Receivers), 3) r0 := cfg.Receivers["sapm"] assert.Equal(t, r0, factory.CreateDefaultConfig()) @@ -46,9 +47,23 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, r1, &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: configmodels.Type(typeStr), + TypeVal: typeStr, NameVal: "sapm/customname", Endpoint: "0.0.0.0:7276", }, }) + + r2 := cfg.Receivers["sapm/tls"].(*Config) + assert.Equal(t, r2, + &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: "sapm/tls", + Endpoint: ":7276", + }, + TLSCredentials: &configtls.TLSSetting{ + CertFile: "/test.crt", + KeyFile: "/test.key", + }, + }) } diff --git a/receiver/sapmreceiver/factory.go b/receiver/sapmreceiver/factory.go index b27b3d74aa3a1..694d44e9e243c 100644 --- a/receiver/sapmreceiver/factory.go +++ b/receiver/sapmreceiver/factory.go @@ -54,7 +54,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { func (f *Factory) CreateDefaultConfig() configmodels.Receiver { return &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: configmodels.Type(typeStr), + TypeVal: typeStr, NameVal: typeStr, Endpoint: defaultEndpoint, }, diff --git a/receiver/sapmreceiver/testdata/config.yaml b/receiver/sapmreceiver/testdata/config.yaml index 69c9704f3f530..c52b7312e91d1 100644 --- a/receiver/sapmreceiver/testdata/config.yaml +++ b/receiver/sapmreceiver/testdata/config.yaml @@ -7,7 +7,14 @@ receivers: # Ex: `endpoint: "7276"` is incorrect. # Ex: `endpoint: "1.2.3.4:7276"` and ":7276" is correct sapm/customname: - endpoint: "0.0.0.0:7276" + endpoint: "0.0.0.0:7276" + + # The following demonstrates how to specify TLS for the receiver. + sapm/tls: + tls: + cert_file: /test.crt + key_file: /test.key + processors: exampleprocessor: diff --git a/receiver/sapmreceiver/testdata/testcert.crt b/receiver/sapmreceiver/testdata/testcert.crt new file mode 100644 index 0000000000000..668c82145c677 --- /dev/null +++ b/receiver/sapmreceiver/testdata/testcert.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDLDCCAhSgAwIBAgIJAO8ClxUckM5xMA0GCSqGSIb3DQEBCwUAMEsxCzAJBgNV +BAYTAk9UMRYwFAYDVQQIDA1PcGVuVGVsZW1ldHJ5MRAwDgYDVQQHDAdTZXJ2aWNl +MRIwEAYDVQQDDAkxMjcuMC4wLjEwHhcNMjAwNTA2MDAzMTQ0WhcNMjEwNTA2MDAz +MTQ0WjBLMQswCQYDVQQGEwJPVDEWMBQGA1UECAwNT3BlblRlbGVtZXRyeTEQMA4G +A1UEBwwHU2VydmljZTESMBAGA1UEAwwJMTI3LjAuMC4xMIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEAyVL12RmIXJP/K1wkGDW2k9OKvmP/kX0w2iPEZ7f0 +3kUvDpAUjGHr3FnnOYYmxQU+gXK0g/pRupNh5n4GuzFGbQNEAl1V+dmAB5w/yX3p +SSZVZmsIRRgTVRd1501mDNqRS6aOy8+PhFUF7rI6V5KBaizLch6ojOtDSouUTnRI +9o47WSeyVALHFllm1BYlCBoiSZ/ZX2BJOteJ/9fawPQzVMSmNOQcymRiHqeBlhlI +gIPFEOh68oJScGNRVin0tnxfUr7eO+cR3iC20aCUXsL3Yq1KKXrxD3/0T8ICK5DM +qWj4gKC4CVlIF68zvVWD/jKSR9u9Gs+aRxA2ZHF+scsHgQIDAQABoxMwETAPBgNV +HREECDAGhwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQA2DT7VxpFLCqc0FHW16yZ9 +/9fQChEN5IBe6mjbysjeYhef8w7xP03wl6SwFXFpTQDFqz6KdolH+Zl4gm6MsrsX +nQNP8fkHfNtHr+28JPDPWC5aWx5xfdQ4ybfeEV+xdNmrKeKGOcQ+Nmjcx06ysy0J +OSjOBwyTvlCIOF5AnafFuCk81EnbMLQeIvsfudXTAi0aw6HMmS2UgmGUcwGsFDRt +Xx1n1nLclK73f2a9mrzrh1s3lMom1FLaZ8fRecB9cVZJAHdw7RDtkG0qFf+v9Hk0 +y8JKVfSmRIveHz62a/61T36VSKIO0qksdR4wEPCjADsdpagVMrwjyu8qShLFXRkn +-----END CERTIFICATE----- diff --git a/receiver/sapmreceiver/testdata/testkey.key b/receiver/sapmreceiver/testdata/testkey.key new file mode 100644 index 0000000000000..b83561755f08d --- /dev/null +++ b/receiver/sapmreceiver/testdata/testkey.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAyVL12RmIXJP/K1wkGDW2k9OKvmP/kX0w2iPEZ7f03kUvDpAU +jGHr3FnnOYYmxQU+gXK0g/pRupNh5n4GuzFGbQNEAl1V+dmAB5w/yX3pSSZVZmsI +RRgTVRd1501mDNqRS6aOy8+PhFUF7rI6V5KBaizLch6ojOtDSouUTnRI9o47WSey +VALHFllm1BYlCBoiSZ/ZX2BJOteJ/9fawPQzVMSmNOQcymRiHqeBlhlIgIPFEOh6 +8oJScGNRVin0tnxfUr7eO+cR3iC20aCUXsL3Yq1KKXrxD3/0T8ICK5DMqWj4gKC4 +CVlIF68zvVWD/jKSR9u9Gs+aRxA2ZHF+scsHgQIDAQABAoIBAG/6OMw8K2By4ObZ +JSpiFd87NlyXejsOCvIKGuAlrYlDqdzLvuImRO4XA0k3mLDVLeMKTeVqgbLo7vco ++c18ptNTkaxPBdcmZtPU0JXd9re9HpsMxVjI/1fA6M9yeWSE3XPafGpYVFcig140 +u8ahsmG/8JjU/KME6DS6Vg8dFsgrb0aJypv1VxA2n4xeWkbwVdniTlny4LFhwvvk +P4dgnGNhUHkeet+re9YZEkXJbNp9hXP/wdEwLJ5tUNI/exngxQaAJ6fP2e1htFoJ +a8TGTwjEwRIAocjI8ieHnUO4lARF8uhEzhSadtZNZvKgbNpE5xJNl9OH9pkQTjo2 +4CLOxpECgYEA8ZbV2o1EcdCKvIWAYhYbBHYe2gadcnO04gtA1bbGk/+YtzaiS20W +zq2qKDhrQJ6sCFe17CN9hM0gdkpsyS1nLtR3F3MVacYe8STh7BD61E5ens+wDWJU +zva3bS1CRMPthZ+i0xc4AOgXDZewD1rxb8rvT5SeXYRZGkvyh9TAzocCgYEA1VVC +lLy3EG/geiMAWd+FnTwWnMViGb8rDY03vJDSKEes9b8HqwEPsX0I3TVeyyYBM3Te +sPYqJxBFh5afU8DYhNue9UbJKSN4j1jg43pK58i99LT+lGRAVaJZLVbsWiEF3MwX +8muCtELAT7j9648oEUX4sF/nnz422Q0VPRS5s7cCgYAqrDHqALnuQJ/A3PPoX282 +QocAi9qTtMxmgQZauYYp7iPTeNsB56r3psU/hXesWlqYvqVrqHkrU/A/9LVyc4qe +QvkmMzW9ETm17oXZZMZpac5czuKR+qRwSjPsHOpvqwvxZlkkYB2MS3KG/BwlGjM7 +Q+UxcVbnvdDfTDrysym7UQKBgQDFeVPVjM7EZ1tak7XKe68aOjoQSmIhxSTcOYGD +imcPJDIFlRxK/gOB32TqJ3IlCHwKHr/Y/TVNzbEe7p1zkMqcSRPepfSloREDWFls +GJLn9ZlowHX79MTcwBhecNz+HR1pIn90RnLJ3BRad7qMZ4rGWof28//bF2L8DjE/ +xkSUBwKBgQCAki3+tscHa1ZQ+VZmvTeQ9gwNZt+cm0FzavhjnvWZAKCrKwg+MvAN +BifZjKsPK7u9u1QGuPP0Zn/Zw3VoMlrC/Paa/OZzwTtb2yN824wbMo0Qvm5WciS7 +DXyNy887h1NLGEyMh6rGUutmI6OPf8WaLqcxrz16dhtr9+N6YDZ/tQ== +-----END RSA PRIVATE KEY----- diff --git a/receiver/sapmreceiver/trace_receiver.go b/receiver/sapmreceiver/trace_receiver.go index 449c94b9700e6..274f955196631 100644 --- a/receiver/sapmreceiver/trace_receiver.go +++ b/receiver/sapmreceiver/trace_receiver.go @@ -69,8 +69,12 @@ func (sr *sapmReceiver) handleRequest(ctx context.Context, req *http.Request) er return err } - ctx = obsreport.ReceiverContext(ctx, sr.config.Name(), "http", "") - ctx = obsreport.StartTraceDataReceiveOp(ctx, sr.config.Name(), "http") + transport := "http" + if sr.config.TLSCredentials != nil { + transport = "https" + } + ctx = obsreport.ReceiverContext(ctx, sr.config.Name(), transport, "") + ctx = obsreport.StartTraceDataReceiveOp(ctx, sr.config.Name(), transport) td := jaegertranslator.ProtoBatchesToInternalTraces(sapm.Batches) @@ -170,7 +174,11 @@ func (sr *sapmReceiver) Start(_ context.Context, host component.Host) error { // run the server on a routine go func() { - host.ReportFatalError(sr.server.Serve(ln)) + if sr.config.TLSCredentials != nil { + host.ReportFatalError(sr.server.ServeTLS(ln, sr.config.TLSCredentials.CertFile, sr.config.TLSCredentials.KeyFile)) + } else { + host.ReportFatalError(sr.server.Serve(ln)) + } }() }) return err diff --git a/receiver/sapmreceiver/trace_receiver_test.go b/receiver/sapmreceiver/trace_receiver_test.go index 87e8ad7ed4115..3a0d134b2fd3d 100644 --- a/receiver/sapmreceiver/trace_receiver_test.go +++ b/receiver/sapmreceiver/trace_receiver_test.go @@ -18,8 +18,11 @@ import ( "bytes" "compress/gzip" "context" + "crypto/tls" + "crypto/x509" "encoding/binary" "fmt" + "io/ioutil" "net/http" "testing" "time" @@ -30,12 +33,15 @@ import ( splunksapm "github.com/signalfx/sapm-proto/gen" "github.com/signalfx/sapm-proto/sapmprotocol" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opencensus.io/trace" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/testutils" "go.opentelemetry.io/collector/translator/conventions" tracetranslator "go.opentelemetry.io/collector/translator/trace" "go.uber.org/zap" @@ -134,7 +140,7 @@ func grpcFixture(t1 time.Time, d1, d2 time.Duration) *model.Batch { } // sendSapm acts as a client for sending sapm to the receiver. This could be replaced with a sapm exporter in the future. -func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool) (*http.Response, error) { +func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, tlsEnabled bool) (*http.Response, error) { // marshal the sapm reqBytes, err := proto.Marshal(sapm) if err != nil { @@ -163,7 +169,11 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool) ( } // build the request - req, _ := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s%s", endpoint, sapmprotocol.TraceEndpointV2), bytes.NewReader(reqBytes)) + url := fmt.Sprintf("http://%s%s", endpoint, sapmprotocol.TraceEndpointV2) + if tlsEnabled { + url = fmt.Sprintf("https://%s%s", endpoint, sapmprotocol.TraceEndpointV2) + } + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(reqBytes)) req.Header.Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue) // set headers for gzip @@ -174,9 +184,24 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool) ( // send the request client := &http.Client{} + + if tlsEnabled { + caCert, errCert := ioutil.ReadFile("./testdata/testcert.crt") + if errCert != nil { + return nil, fmt.Errorf("failed to load certificate: %s", errCert.Error()) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + client.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + }, + } + } + resp, err := client.Do(req) if err != nil { - return resp, fmt.Errorf("failed to send request to receiver %v", resp) + return resp, fmt.Errorf("failed to send request to receiver %v", err) } return resp, nil @@ -186,11 +211,13 @@ func TestReception(t *testing.T) { now := time.Unix(1542158650, 536343000).UTC() nowPlus10min := now.Add(10 * time.Minute) nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second) + tlsAddress := testutils.GetAvailableLocalAddress(t) type args struct { config *Config sapm *splunksapm.PostSpansRequest zipped bool + useTLS bool } tests := []struct { name string @@ -206,6 +233,7 @@ func TestReception(t *testing.T) { }, sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now, time.Minute*10, time.Second*2)}}, zipped: false, + useTLS: false, }, want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec), }, @@ -217,6 +245,24 @@ func TestReception(t *testing.T) { }, sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now, time.Minute*10, time.Second*2)}}, zipped: true, + useTLS: false, + }, + want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec), + }, + { + name: "connect via TLS compressed sapm", + args: args{ + config: &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: tlsAddress}, + TLSCredentials: &configtls.TLSSetting{ + CertFile: ("./testdata/testcert.crt"), + KeyFile: ("./testdata/testkey.key"), + }, + }, + sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now, time.Minute*10, time.Second*2)}}, + zipped: false, + useTLS: true, }, want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec), }, @@ -232,13 +278,22 @@ func TestReception(t *testing.T) { t.Log("Starting") defer sr.Shutdown(context.Background()) - assert.NoError(t, sr.Start(context.Background(), componenttest.NewNopHost()), "should not have failed to start trace reception") + // NewNopHost swallows errors so using NewErrorWaitingHost to catch any potential errors starting the + // receiver. + mh := componenttest.NewErrorWaitingHost() + require.NoError(t, sr.Start(context.Background(), mh), "should not have failed to start trace reception") + + // If there are errors reported through host.ReportFatalError() this will retrieve it. + receivedError, receivedErr := mh.WaitForFatalError(500 * time.Millisecond) + require.NoError(t, receivedErr, "should not have failed to start trace reception") + require.False(t, receivedError) t.Log("Trace Reception Started") t.Log("Sending Sapm Request") var resp *http.Response - resp, err = sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.zipped) - assert.NoError(t, err, fmt.Sprintf("should not have failed when sending sapm %v", resp)) + resp, err = sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.zipped, tt.args.useTLS) + require.NoErrorf(t, err, "should not have failed when sending sapm %v", err) + assert.Equal(t, 200, resp.StatusCode) t.Log("SAPM Request Received") // retrieve received traces diff --git a/receiver/signalfxreceiver/README.md b/receiver/signalfxreceiver/README.md new file mode 100644 index 0000000000000..893c3625443af --- /dev/null +++ b/receiver/signalfxreceiver/README.md @@ -0,0 +1,23 @@ +# SignalFx Receiver + +The SignalFx receiver accepts metrics in the [SignalFx proto format](https://github.com/signalfx/com_signalfx_metrics_protobuf). +This allows the collector to receiver metrics from other collectors or the +SignalFx Smart Agent. + +## Configuration + +Example: + +```yaml +receivers: + signalfx: + endpoint: localhost:7276 + tls: + cert_file: /test.crt + key_file: /test.key +``` + +* `endpoint`: Address and port that the SignalFx receiver should bind to. Note that this must be 0.0.0.0: instead of localhost if you want to receive spans from sources exporting to IPs other than localhost on the same host. For example, when the collector is deployed as a k8s deployment and exposed using a service. +* `tls`: This is an optional object used to specify if TLS should be used for incoming connections. + * `cert_file`: Specifies the certificate file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection. + * `key_file`: Specifies the key file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection. diff --git a/receiver/signalfxreceiver/config.go b/receiver/signalfxreceiver/config.go index 7e04a5295a332..b51b1622461bf 100644 --- a/receiver/signalfxreceiver/config.go +++ b/receiver/signalfxreceiver/config.go @@ -14,9 +14,16 @@ package signalfxreceiver -import "go.opentelemetry.io/collector/config/configmodels" +import ( + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" +) // Config defines configuration for the SignalFx receiver. type Config struct { - configmodels.ReceiverSettings `mapstructure:",squash"` + configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + + // Configures the receiver to use TLS. + // The default value is nil, which will cause the receiver to not use TLS. + TLSCredentials *configtls.TLSSetting `mapstructure:"tls, omitempty"` } diff --git a/receiver/signalfxreceiver/config_test.go b/receiver/signalfxreceiver/config_test.go index 9969309c27c5f..19209497a381d 100644 --- a/receiver/signalfxreceiver/config_test.go +++ b/receiver/signalfxreceiver/config_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" ) func TestLoadConfig(t *testing.T) { @@ -37,7 +38,7 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, len(cfg.Receivers), 2) + assert.Equal(t, len(cfg.Receivers), 3) r0 := cfg.Receivers["signalfx"] assert.Equal(t, r0, factory.CreateDefaultConfig()) @@ -46,9 +47,22 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, r1, &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: configmodels.Type(typeStr), + TypeVal: typeStr, NameVal: "signalfx/allsettings", Endpoint: "localhost:8080", }, }) + + r2 := cfg.Receivers["signalfx/tls"].(*Config) + assert.Equal(t, r2, + &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: "signalfx/tls", + }, + TLSCredentials: &configtls.TLSSetting{ + CertFile: "/test.crt", + KeyFile: "/test.key", + }, + }) } diff --git a/receiver/signalfxreceiver/factory.go b/receiver/signalfxreceiver/factory.go index e5dd780a173c8..e72ce3718c08d 100644 --- a/receiver/signalfxreceiver/factory.go +++ b/receiver/signalfxreceiver/factory.go @@ -51,7 +51,7 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { func (f *Factory) CreateDefaultConfig() configmodels.Receiver { return &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: configmodels.Type(typeStr), + TypeVal: typeStr, NameVal: typeStr, }, } diff --git a/receiver/signalfxreceiver/receiver.go b/receiver/signalfxreceiver/receiver.go index 493a1b02ac7a0..13cd37b4b6478 100644 --- a/receiver/signalfxreceiver/receiver.go +++ b/receiver/signalfxreceiver/receiver.go @@ -131,8 +131,12 @@ func (r *sfxReceiver) Start(_ context.Context, host component.Host) error { err = nil go func() { - if err = r.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - host.ReportFatalError(err) + if r.config.TLSCredentials != nil { + host.ReportFatalError(r.server.ListenAndServeTLS(r.config.TLSCredentials.CertFile, r.config.TLSCredentials.KeyFile)) + } else { + if err = r.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + host.ReportFatalError(err) + } } }() }) @@ -154,8 +158,12 @@ func (r *sfxReceiver) Shutdown(context.Context) error { } func (r *sfxReceiver) handleReq(resp http.ResponseWriter, req *http.Request) { - ctx := obsreport.ReceiverContext(req.Context(), r.config.Name(), "http", r.config.Name()) - ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.Name(), "http") + transport := "http" + if r.config.TLSCredentials != nil { + transport = "https" + } + ctx := obsreport.ReceiverContext(req.Context(), r.config.Name(), transport, r.config.Name()) + ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.Name(), transport) if req.Method != http.MethodPost { r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBody, nil) diff --git a/receiver/signalfxreceiver/receiver_test.go b/receiver/signalfxreceiver/receiver_test.go index d834178790466..854258240e377 100644 --- a/receiver/signalfxreceiver/receiver_test.go +++ b/receiver/signalfxreceiver/receiver_test.go @@ -18,8 +18,11 @@ import ( "bytes" "compress/gzip" "context" + "crypto/tls" + "crypto/x509" "encoding/json" "errors" + "fmt" "io" "io/ioutil" "net/http" @@ -30,12 +33,14 @@ import ( metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/golang/protobuf/proto" + timestamp "github.com/golang/protobuf/ptypes/timestamp" sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/exporter/exportertest" @@ -160,24 +165,8 @@ func Test_sfxReceiver_handleReq(t *testing.T) { config := (&Factory{}).CreateDefaultConfig().(*Config) config.Endpoint = "localhost:0" // Actually not creating the endpoint - buildSFxMsgFn := func() *sfxpb.DataPointUploadMessage { - return &sfxpb.DataPointUploadMessage{ - Datapoints: []*sfxpb.DataPoint{ - { - Metric: strPtr("single"), - Timestamp: func() *int64 { - l := time.Now().Unix() * 1e3 - return &l - }(), - Value: &sfxpb.Datum{ - IntValue: int64Ptr(13), - }, - MetricType: sfxTypePtr(sfxpb.MetricType_GAUGE), - Dimensions: buildNDimensions(3), - }, - }, - } - } + currentTime := time.Now().Unix() * 1e3 + sFxMsg := buildSFxMsg(¤tTime, 13, 3) tests := []struct { name string @@ -257,8 +246,7 @@ func Test_sfxReceiver_handleReq(t *testing.T) { { name: "msg_accepted", req: func() *http.Request { - sfxMsg := buildSFxMsgFn() - msgBytes, err := proto.Marshal(sfxMsg) + msgBytes, err := proto.Marshal(sFxMsg) require.NoError(t, err) req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) req.Header.Set("Content-Type", "application/x-protobuf") @@ -272,8 +260,7 @@ func Test_sfxReceiver_handleReq(t *testing.T) { { name: "msg_accepted_gzipped", req: func() *http.Request { - sfxMsg := buildSFxMsgFn() - msgBytes, err := proto.Marshal(sfxMsg) + msgBytes, err := proto.Marshal(sFxMsg) require.NoError(t, err) var buf bytes.Buffer @@ -295,8 +282,7 @@ func Test_sfxReceiver_handleReq(t *testing.T) { { name: "bad_gzipped_msg", req: func() *http.Request { - sfxMsg := buildSFxMsgFn() - msgBytes, err := proto.Marshal(sfxMsg) + msgBytes, err := proto.Marshal(sFxMsg) require.NoError(t, err) req := httptest.NewRequest("POST", "http://localhost", bytes.NewReader(msgBytes)) @@ -333,6 +319,120 @@ func Test_sfxReceiver_handleReq(t *testing.T) { } } +func Test_sfxReceiver_TLS(t *testing.T) { + addr := testutils.GetAvailableLocalAddress(t) + cfg := (&Factory{}).CreateDefaultConfig().(*Config) + cfg.Endpoint = addr + cfg.TLSCredentials = &configtls.TLSSetting{ + CertFile: "./testdata/testcert.crt", + KeyFile: "./testdata/testkey.key", + } + sink := new(exportertest.SinkMetricsExporterOld) + r, err := New(zap.NewNop(), *cfg, sink) + require.NoError(t, err) + defer r.Shutdown(context.Background()) + + // NewNopHost swallows errors so using NewErrorWaitingHost to catch any potential errors starting the + // receiver. + mh := componenttest.NewErrorWaitingHost() + require.NoError(t, r.Start(context.Background(), mh), "should not have failed to start metric reception") + + // If there are errors reported through host.ReportFatalError() this will retrieve it. + receivedError, receivedErr := mh.WaitForFatalError(500 * time.Millisecond) + require.NoError(t, receivedErr, "should not have failed to start metric reception") + require.False(t, receivedError) + t.Log("Metric Reception Started") + + msec := time.Now().Unix() * 1e3 + want := consumerdata.MetricsData{ + Metrics: []*metricspb.Metric{ + { + MetricDescriptor: &metricspb.MetricDescriptor{ + Name: "single", + Type: metricspb.MetricDescriptor_GAUGE_INT64, + LabelKeys: []*metricspb.LabelKey{ + {Key: "k0"}, {Key: "k1"}, {Key: "k2"}, + }, + }, + Timeseries: []*metricspb.TimeSeries{ + { + LabelValues: []*metricspb.LabelValue{ + { + Value: "v0", + HasValue: true, + }, + { + Value: "v1", + HasValue: true, + }, + { + Value: "v2", + HasValue: true, + }, + }, + Points: []*metricspb.Point{{ + Timestamp: ×tamp.Timestamp{ + Seconds: msec / 1e3, + Nanos: int32(msec%1e3) * 1e3, + }, + Value: &metricspb.Point_Int64Value{Int64Value: 13}, + }}, + }, + }, + }, + }, + } + + t.Log("Sending SignalFx metric data Request") + + body, err := proto.Marshal(buildSFxMsg(&msec, 13, 3)) + require.NoError(t, err, fmt.Sprintf("failed to marshal SFx message: %v", err)) + + url := fmt.Sprintf("https://%s/v2/datapoint", addr) + + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + require.NoErrorf(t, err, "should have no errors with new request: %v", err) + req.Header.Set("Content-Type", "application/x-protobuf") + + caCert, err := ioutil.ReadFile("./testdata/testcert.crt") + require.NoErrorf(t, err, "failed to load certificate: %v", err) + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + }, + }, + } + + resp, err := client.Do(req) + require.NoErrorf(t, err, "should not have failed when sending to signalFx receiver %v", err) + assert.Equal(t, http.StatusAccepted, resp.StatusCode) + t.Log("SignalFx Request Received") + + got := sink.AllMetrics() + require.Equal(t, 1, len(got)) + assert.Equal(t, want, got[0]) +} + +func buildSFxMsg(time *int64, value int64, dimensions uint) *sfxpb.DataPointUploadMessage { + return &sfxpb.DataPointUploadMessage{ + Datapoints: []*sfxpb.DataPoint{ + { + Metric: strPtr("single"), + Timestamp: time, + Value: &sfxpb.Datum{ + IntValue: int64Ptr(value), + }, + MetricType: sfxTypePtr(sfxpb.MetricType_GAUGE), + Dimensions: buildNDimensions(dimensions), + }, + }, + } +} + type badReqBody struct{} var _ io.ReadCloser = (*badReqBody)(nil) diff --git a/receiver/signalfxreceiver/testdata/config.yaml b/receiver/signalfxreceiver/testdata/config.yaml index db2f6238eb8b0..ea9ddf989a000 100644 --- a/receiver/signalfxreceiver/testdata/config.yaml +++ b/receiver/signalfxreceiver/testdata/config.yaml @@ -4,6 +4,10 @@ receivers: # endpoint specifies the network interface and port which will receive # SignalFx metrics. endpoint: localhost:8080 + signalfx/tls: + tls: + cert_file: /test.crt + key_file: /test.key processors: exampleprocessor: diff --git a/receiver/signalfxreceiver/testdata/testcert.crt b/receiver/signalfxreceiver/testdata/testcert.crt new file mode 100644 index 0000000000000..668c82145c677 --- /dev/null +++ b/receiver/signalfxreceiver/testdata/testcert.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDLDCCAhSgAwIBAgIJAO8ClxUckM5xMA0GCSqGSIb3DQEBCwUAMEsxCzAJBgNV +BAYTAk9UMRYwFAYDVQQIDA1PcGVuVGVsZW1ldHJ5MRAwDgYDVQQHDAdTZXJ2aWNl +MRIwEAYDVQQDDAkxMjcuMC4wLjEwHhcNMjAwNTA2MDAzMTQ0WhcNMjEwNTA2MDAz +MTQ0WjBLMQswCQYDVQQGEwJPVDEWMBQGA1UECAwNT3BlblRlbGVtZXRyeTEQMA4G +A1UEBwwHU2VydmljZTESMBAGA1UEAwwJMTI3LjAuMC4xMIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEAyVL12RmIXJP/K1wkGDW2k9OKvmP/kX0w2iPEZ7f0 +3kUvDpAUjGHr3FnnOYYmxQU+gXK0g/pRupNh5n4GuzFGbQNEAl1V+dmAB5w/yX3p +SSZVZmsIRRgTVRd1501mDNqRS6aOy8+PhFUF7rI6V5KBaizLch6ojOtDSouUTnRI +9o47WSeyVALHFllm1BYlCBoiSZ/ZX2BJOteJ/9fawPQzVMSmNOQcymRiHqeBlhlI +gIPFEOh68oJScGNRVin0tnxfUr7eO+cR3iC20aCUXsL3Yq1KKXrxD3/0T8ICK5DM +qWj4gKC4CVlIF68zvVWD/jKSR9u9Gs+aRxA2ZHF+scsHgQIDAQABoxMwETAPBgNV +HREECDAGhwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQA2DT7VxpFLCqc0FHW16yZ9 +/9fQChEN5IBe6mjbysjeYhef8w7xP03wl6SwFXFpTQDFqz6KdolH+Zl4gm6MsrsX +nQNP8fkHfNtHr+28JPDPWC5aWx5xfdQ4ybfeEV+xdNmrKeKGOcQ+Nmjcx06ysy0J +OSjOBwyTvlCIOF5AnafFuCk81EnbMLQeIvsfudXTAi0aw6HMmS2UgmGUcwGsFDRt +Xx1n1nLclK73f2a9mrzrh1s3lMom1FLaZ8fRecB9cVZJAHdw7RDtkG0qFf+v9Hk0 +y8JKVfSmRIveHz62a/61T36VSKIO0qksdR4wEPCjADsdpagVMrwjyu8qShLFXRkn +-----END CERTIFICATE----- diff --git a/receiver/signalfxreceiver/testdata/testkey.key b/receiver/signalfxreceiver/testdata/testkey.key new file mode 100644 index 0000000000000..b83561755f08d --- /dev/null +++ b/receiver/signalfxreceiver/testdata/testkey.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAyVL12RmIXJP/K1wkGDW2k9OKvmP/kX0w2iPEZ7f03kUvDpAU +jGHr3FnnOYYmxQU+gXK0g/pRupNh5n4GuzFGbQNEAl1V+dmAB5w/yX3pSSZVZmsI +RRgTVRd1501mDNqRS6aOy8+PhFUF7rI6V5KBaizLch6ojOtDSouUTnRI9o47WSey +VALHFllm1BYlCBoiSZ/ZX2BJOteJ/9fawPQzVMSmNOQcymRiHqeBlhlIgIPFEOh6 +8oJScGNRVin0tnxfUr7eO+cR3iC20aCUXsL3Yq1KKXrxD3/0T8ICK5DMqWj4gKC4 +CVlIF68zvVWD/jKSR9u9Gs+aRxA2ZHF+scsHgQIDAQABAoIBAG/6OMw8K2By4ObZ +JSpiFd87NlyXejsOCvIKGuAlrYlDqdzLvuImRO4XA0k3mLDVLeMKTeVqgbLo7vco ++c18ptNTkaxPBdcmZtPU0JXd9re9HpsMxVjI/1fA6M9yeWSE3XPafGpYVFcig140 +u8ahsmG/8JjU/KME6DS6Vg8dFsgrb0aJypv1VxA2n4xeWkbwVdniTlny4LFhwvvk +P4dgnGNhUHkeet+re9YZEkXJbNp9hXP/wdEwLJ5tUNI/exngxQaAJ6fP2e1htFoJ +a8TGTwjEwRIAocjI8ieHnUO4lARF8uhEzhSadtZNZvKgbNpE5xJNl9OH9pkQTjo2 +4CLOxpECgYEA8ZbV2o1EcdCKvIWAYhYbBHYe2gadcnO04gtA1bbGk/+YtzaiS20W +zq2qKDhrQJ6sCFe17CN9hM0gdkpsyS1nLtR3F3MVacYe8STh7BD61E5ens+wDWJU +zva3bS1CRMPthZ+i0xc4AOgXDZewD1rxb8rvT5SeXYRZGkvyh9TAzocCgYEA1VVC +lLy3EG/geiMAWd+FnTwWnMViGb8rDY03vJDSKEes9b8HqwEPsX0I3TVeyyYBM3Te +sPYqJxBFh5afU8DYhNue9UbJKSN4j1jg43pK58i99LT+lGRAVaJZLVbsWiEF3MwX +8muCtELAT7j9648oEUX4sF/nnz422Q0VPRS5s7cCgYAqrDHqALnuQJ/A3PPoX282 +QocAi9qTtMxmgQZauYYp7iPTeNsB56r3psU/hXesWlqYvqVrqHkrU/A/9LVyc4qe +QvkmMzW9ETm17oXZZMZpac5czuKR+qRwSjPsHOpvqwvxZlkkYB2MS3KG/BwlGjM7 +Q+UxcVbnvdDfTDrysym7UQKBgQDFeVPVjM7EZ1tak7XKe68aOjoQSmIhxSTcOYGD +imcPJDIFlRxK/gOB32TqJ3IlCHwKHr/Y/TVNzbEe7p1zkMqcSRPepfSloREDWFls +GJLn9ZlowHX79MTcwBhecNz+HR1pIn90RnLJ3BRad7qMZ4rGWof28//bF2L8DjE/ +xkSUBwKBgQCAki3+tscHa1ZQ+VZmvTeQ9gwNZt+cm0FzavhjnvWZAKCrKwg+MvAN +BifZjKsPK7u9u1QGuPP0Zn/Zw3VoMlrC/Paa/OZzwTtb2yN824wbMo0Qvm5WciS7 +DXyNy887h1NLGEyMh6rGUutmI6OPf8WaLqcxrz16dhtr9+N6YDZ/tQ== +-----END RSA PRIVATE KEY-----