Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TLS for SAPM and SignalFx receiver #215

Merged
merged 4 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add TLS for SAPM and SignalFx receiver
  • Loading branch information
ccaraman committed May 29, 2020
commit a865a18656c0ff7c06c65f1190a233ef535209be
7 changes: 7 additions & 0 deletions receiver/sapmreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ Example:
receivers:
sapm:
endpoint: localhost:7276
tls_credentials:
cert_file: /test.crt
key_file: /test.key
```

* `endpoint`: Address and port that the SAPM receiver should bind to. Note that this must be 0.0.0.0:<port> instead of localhost if you want to receive spans from sources exporting to IPs other than localhost on the same host. For example, when the collector is deployed as a k8s deployment and exposed using a service.
* `tls_crendentials`: This is an optional object used to specify if TLS should be used for incoming connections.
* `cert_file`: Specifies the certificate file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection.
* `key_file`: Specifies the key file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection.

4 changes: 2 additions & 2 deletions receiver/sapmreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package sapmreceiver

import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/receiver"
)

// Config defines configuration for SAPM receiver.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
receiver.SecureReceiverSettings `mapstructure:",squash"`
}
30 changes: 25 additions & 5 deletions receiver/sapmreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/receiver"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -37,18 +38,37 @@ func TestLoadConfig(t *testing.T) {

// The receiver `sapm/disabled` doesn't count because disabled receivers
// are excluded from the final list.
assert.Equal(t, len(cfg.Receivers), 2)
assert.Equal(t, len(cfg.Receivers), 3)

r0 := cfg.Receivers["sapm"]
assert.Equal(t, r0, factory.CreateDefaultConfig())

r1 := cfg.Receivers["sapm/customname"].(*Config)
assert.Equal(t, r1,
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: configmodels.Type(typeStr),
NameVal: "sapm/customname",
Endpoint: "0.0.0.0:7276",
SecureReceiverSettings: receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "sapm/customname",
Endpoint: "0.0.0.0:7276",
},
},
})

r2 := cfg.Receivers["sapm/tls"].(*Config)
assert.Equal(t, r2,
&Config{
SecureReceiverSettings: receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "sapm/tls",
Endpoint: ":7276",
},
TLSCredentials: &receiver.TLSCredentials{
CertFile: "/test.crt",
KeyFile: "/test.key",
},

},
})
}
11 changes: 7 additions & 4 deletions receiver/sapmreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

const (
Expand Down Expand Up @@ -53,10 +54,12 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
// CreateDefaultConfig creates the default configuration for SAPM receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
Endpoint: defaultEndpoint,
SecureReceiverSettings: receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
Endpoint: defaultEndpoint,
},
},
}
}
Expand Down
9 changes: 8 additions & 1 deletion receiver/sapmreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ receivers:
# Ex: `endpoint: "7276"` is incorrect.
# Ex: `endpoint: "1.2.3.4:7276"` and ":7276" is correct
sapm/customname:
endpoint: "0.0.0.0:7276"
endpoint: "0.0.0.0:7276"

# The following demonstrates how to specify TLS for the receiver.
sapm/tls:
tls_credentials:
cert_file: /test.crt
key_file: /test.key


processors:
exampleprocessor:
Expand Down
19 changes: 19 additions & 0 deletions receiver/sapmreceiver/testdata/testcert.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDLDCCAhSgAwIBAgIJAO8ClxUckM5xMA0GCSqGSIb3DQEBCwUAMEsxCzAJBgNV
BAYTAk9UMRYwFAYDVQQIDA1PcGVuVGVsZW1ldHJ5MRAwDgYDVQQHDAdTZXJ2aWNl
MRIwEAYDVQQDDAkxMjcuMC4wLjEwHhcNMjAwNTA2MDAzMTQ0WhcNMjEwNTA2MDAz
MTQ0WjBLMQswCQYDVQQGEwJPVDEWMBQGA1UECAwNT3BlblRlbGVtZXRyeTEQMA4G
A1UEBwwHU2VydmljZTESMBAGA1UEAwwJMTI3LjAuMC4xMIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEAyVL12RmIXJP/K1wkGDW2k9OKvmP/kX0w2iPEZ7f0
3kUvDpAUjGHr3FnnOYYmxQU+gXK0g/pRupNh5n4GuzFGbQNEAl1V+dmAB5w/yX3p
SSZVZmsIRRgTVRd1501mDNqRS6aOy8+PhFUF7rI6V5KBaizLch6ojOtDSouUTnRI
9o47WSeyVALHFllm1BYlCBoiSZ/ZX2BJOteJ/9fawPQzVMSmNOQcymRiHqeBlhlI
gIPFEOh68oJScGNRVin0tnxfUr7eO+cR3iC20aCUXsL3Yq1KKXrxD3/0T8ICK5DM
qWj4gKC4CVlIF68zvVWD/jKSR9u9Gs+aRxA2ZHF+scsHgQIDAQABoxMwETAPBgNV
HREECDAGhwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQA2DT7VxpFLCqc0FHW16yZ9
/9fQChEN5IBe6mjbysjeYhef8w7xP03wl6SwFXFpTQDFqz6KdolH+Zl4gm6MsrsX
nQNP8fkHfNtHr+28JPDPWC5aWx5xfdQ4ybfeEV+xdNmrKeKGOcQ+Nmjcx06ysy0J
OSjOBwyTvlCIOF5AnafFuCk81EnbMLQeIvsfudXTAi0aw6HMmS2UgmGUcwGsFDRt
Xx1n1nLclK73f2a9mrzrh1s3lMom1FLaZ8fRecB9cVZJAHdw7RDtkG0qFf+v9Hk0
y8JKVfSmRIveHz62a/61T36VSKIO0qksdR4wEPCjADsdpagVMrwjyu8qShLFXRkn
-----END CERTIFICATE-----
27 changes: 27 additions & 0 deletions receiver/sapmreceiver/testdata/testkey.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAyVL12RmIXJP/K1wkGDW2k9OKvmP/kX0w2iPEZ7f03kUvDpAU
jGHr3FnnOYYmxQU+gXK0g/pRupNh5n4GuzFGbQNEAl1V+dmAB5w/yX3pSSZVZmsI
RRgTVRd1501mDNqRS6aOy8+PhFUF7rI6V5KBaizLch6ojOtDSouUTnRI9o47WSey
VALHFllm1BYlCBoiSZ/ZX2BJOteJ/9fawPQzVMSmNOQcymRiHqeBlhlIgIPFEOh6
8oJScGNRVin0tnxfUr7eO+cR3iC20aCUXsL3Yq1KKXrxD3/0T8ICK5DMqWj4gKC4
CVlIF68zvVWD/jKSR9u9Gs+aRxA2ZHF+scsHgQIDAQABAoIBAG/6OMw8K2By4ObZ
JSpiFd87NlyXejsOCvIKGuAlrYlDqdzLvuImRO4XA0k3mLDVLeMKTeVqgbLo7vco
+c18ptNTkaxPBdcmZtPU0JXd9re9HpsMxVjI/1fA6M9yeWSE3XPafGpYVFcig140
u8ahsmG/8JjU/KME6DS6Vg8dFsgrb0aJypv1VxA2n4xeWkbwVdniTlny4LFhwvvk
P4dgnGNhUHkeet+re9YZEkXJbNp9hXP/wdEwLJ5tUNI/exngxQaAJ6fP2e1htFoJ
a8TGTwjEwRIAocjI8ieHnUO4lARF8uhEzhSadtZNZvKgbNpE5xJNl9OH9pkQTjo2
4CLOxpECgYEA8ZbV2o1EcdCKvIWAYhYbBHYe2gadcnO04gtA1bbGk/+YtzaiS20W
zq2qKDhrQJ6sCFe17CN9hM0gdkpsyS1nLtR3F3MVacYe8STh7BD61E5ens+wDWJU
zva3bS1CRMPthZ+i0xc4AOgXDZewD1rxb8rvT5SeXYRZGkvyh9TAzocCgYEA1VVC
lLy3EG/geiMAWd+FnTwWnMViGb8rDY03vJDSKEes9b8HqwEPsX0I3TVeyyYBM3Te
sPYqJxBFh5afU8DYhNue9UbJKSN4j1jg43pK58i99LT+lGRAVaJZLVbsWiEF3MwX
8muCtELAT7j9648oEUX4sF/nnz422Q0VPRS5s7cCgYAqrDHqALnuQJ/A3PPoX282
QocAi9qTtMxmgQZauYYp7iPTeNsB56r3psU/hXesWlqYvqVrqHkrU/A/9LVyc4qe
QvkmMzW9ETm17oXZZMZpac5czuKR+qRwSjPsHOpvqwvxZlkkYB2MS3KG/BwlGjM7
Q+UxcVbnvdDfTDrysym7UQKBgQDFeVPVjM7EZ1tak7XKe68aOjoQSmIhxSTcOYGD
imcPJDIFlRxK/gOB32TqJ3IlCHwKHr/Y/TVNzbEe7p1zkMqcSRPepfSloREDWFls
GJLn9ZlowHX79MTcwBhecNz+HR1pIn90RnLJ3BRad7qMZ4rGWof28//bF2L8DjE/
xkSUBwKBgQCAki3+tscHa1ZQ+VZmvTeQ9gwNZt+cm0FzavhjnvWZAKCrKwg+MvAN
BifZjKsPK7u9u1QGuPP0Zn/Zw3VoMlrC/Paa/OZzwTtb2yN824wbMo0Qvm5WciS7
DXyNy887h1NLGEyMh6rGUutmI6OPf8WaLqcxrz16dhtr9+N6YDZ/tQ==
-----END RSA PRIVATE KEY-----
14 changes: 11 additions & 3 deletions receiver/sapmreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ func (sr *sapmReceiver) handleRequest(ctx context.Context, req *http.Request) er
return err
}

ctx = obsreport.ReceiverContext(ctx, sr.config.Name(), "http", "")
ctx = obsreport.StartTraceDataReceiveOp(ctx, sr.config.Name(), "http")
transport := "http"
if sr.config.TLSCredentials != nil {
transport = "https"
}
ctx = obsreport.ReceiverContext(ctx, sr.config.Name(), transport, "")
ctx = obsreport.StartTraceDataReceiveOp(ctx, sr.config.Name(), transport)

td := jaegertranslator.ProtoBatchesToInternalTraces(sapm.Batches)

Expand Down Expand Up @@ -170,7 +174,11 @@ func (sr *sapmReceiver) Start(_ context.Context, host component.Host) error {

// run the server on a routine
go func() {
host.ReportFatalError(sr.server.Serve(ln))
if sr.config.TLSCredentials != nil {
host.ReportFatalError(sr.server.ServeTLS(ln, sr.config.TLSCredentials.CertFile, sr.config.TLSCredentials.KeyFile))
} else {
host.ReportFatalError(sr.server.Serve(ln))
}
}()
})
return err
Expand Down
77 changes: 69 additions & 8 deletions receiver/sapmreceiver/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"
Expand All @@ -30,12 +33,15 @@ import (
splunksapm "github.com/signalfx/sapm-proto/gen"
"github.com/signalfx/sapm-proto/sapmprotocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/testutils"
"go.opentelemetry.io/collector/translator/conventions"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -134,7 +140,7 @@ func grpcFixture(t1 time.Time, d1, d2 time.Duration) *model.Batch {
}

// sendSapm acts as a client for sending sapm to the receiver. This could be replaced with a sapm exporter in the future.
func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool) (*http.Response, error) {
func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, tlsEnabled bool) (*http.Response, error) {
// marshal the sapm
reqBytes, err := proto.Marshal(sapm)
if err != nil {
Expand Down Expand Up @@ -163,7 +169,11 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool) (
}

// build the request
req, _ := http.NewRequest(http.MethodPost, fmt.Sprintf("http:https://%s%s", endpoint, sapmprotocol.TraceEndpointV2), bytes.NewReader(reqBytes))
url := fmt.Sprintf("http:https://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
if tlsEnabled {
url = fmt.Sprintf("https://%s%s", endpoint, sapmprotocol.TraceEndpointV2)
}
req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader(reqBytes))
req.Header.Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue)

// set headers for gzip
Expand All @@ -174,9 +184,24 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool) (

// send the request
client := &http.Client{}

if tlsEnabled {
caCert, err := ioutil.ReadFile("./testdata/testcert.crt")
if err != nil {
return nil, fmt.Errorf("failed to load certificate: %s", err.Error())
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
client.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
},
}
}

resp, err := client.Do(req)
if err != nil {
return resp, fmt.Errorf("failed to send request to receiver %v", resp)
return resp, fmt.Errorf("failed to send request to receiver %v", err)
}

return resp, nil
Expand All @@ -186,11 +211,13 @@ func TestReception(t *testing.T) {
now := time.Unix(1542158650, 536343000).UTC()
nowPlus10min := now.Add(10 * time.Minute)
nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second)
tlsAddress := testutils.GetAvailableLocalAddress(t)

type args struct {
config *Config
sapm *splunksapm.PostSpansRequest
zipped bool
useTLS bool
}
tests := []struct {
name string
Expand All @@ -202,21 +229,46 @@ func TestReception(t *testing.T) {
args: args{
// 1. Create the SAPM receiver aka "server"
config: &Config{
ReceiverSettings: configmodels.ReceiverSettings{Endpoint: defaultEndpoint},
SecureReceiverSettings: receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{Endpoint: defaultEndpoint},
},
},
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now, time.Minute*10, time.Second*2)}},
zipped: false,
useTLS: false,
},
want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
},
{
name: "receive compressed sapm",
args: args{
config: &Config{
ReceiverSettings: configmodels.ReceiverSettings{Endpoint: defaultEndpoint},
SecureReceiverSettings: receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{Endpoint: defaultEndpoint},
},
},
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now, time.Minute*10, time.Second*2)}},
zipped: true,
useTLS: false,
},
want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
},
{
name: "connect via TLS compressed sapm",
args: args{
config: &Config{
SecureReceiverSettings: receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: tlsAddress},
TLSCredentials: &receiver.TLSCredentials{
CertFile: ("./testdata/testcert.crt"),
KeyFile: ("./testdata/testkey.key"),
},
},
},
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now, time.Minute*10, time.Second*2)}},
zipped: false,
useTLS: true,
},
want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
},
Expand All @@ -232,13 +284,22 @@ func TestReception(t *testing.T) {
t.Log("Starting")
defer sr.Shutdown(context.Background())

assert.NoError(t, sr.Start(context.Background(), componenttest.NewNopHost()), "should not have failed to start trace reception")
// NewNopHost swallows errors so using NewErrorWaitingHost to catch any potential errors starting the
// receiver.
mh := componenttest.NewErrorWaitingHost()
require.NoError(t, sr.Start(context.Background(), mh), "should not have failed to start trace reception")

// If there are errors reported through host.ReportFatalError() this will retrieve it.
receivedError, receivedErr := mh.WaitForFatalError(500 * time.Millisecond)
require.NoError(t, receivedErr, "should not have failed to start trace reception")
require.False(t, receivedError)
t.Log("Trace Reception Started")

t.Log("Sending Sapm Request")
var resp *http.Response
resp, err = sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.zipped)
assert.NoError(t, err, fmt.Sprintf("should not have failed when sending sapm %v", resp))
resp, err = sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.zipped, tt.args.useTLS)
require.NoError(t, err, fmt.Sprintf("should not have failed when sending sapm %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
require.NoError(t, err, fmt.Sprintf("should not have failed when sending sapm %v", err))
require.NoErrorf(t, err, "should not have failed when sending sapm %v", err)

assert.Equal(t, 200, resp.StatusCode)
t.Log("SAPM Request Received")

// retrieve received traces
Expand Down
23 changes: 23 additions & 0 deletions receiver/signalfxreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SignalFx Receiver

The SignalFx receiver accepts metrics in the [SignalFx proto format](https://github.com/signalfx/com_signalfx_metrics_protobuf).
This allows the collector to receiver metrics from other collectors or the
SignalFx Smart Agent.

## Configuration

Example:

```yaml
receivers:
signalfx:
endpoint: localhost:7276
tls_credentials:
cert_file: /test.crt
key_file: /test.key
```

* `endpoint`: Address and port that the SignalFx receiver should bind to. Note that this must be 0.0.0.0:<port> instead of localhost if you want to receive spans from sources exporting to IPs other than localhost on the same host. For example, when the collector is deployed as a k8s deployment and exposed using a service.
* `tls_crendentials`: This is an optional object used to specify if TLS should be used for incoming connections.
* `cert_file`: Specifies the certificate file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection.
* `key_file`: Specifies the key file to use for TLS connection. Note: Both `key_file` and `cert_file` are required for TLS connection.
4 changes: 2 additions & 2 deletions receiver/signalfxreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package signalfxreceiver

import "go.opentelemetry.io/collector/config/configmodels"
import "go.opentelemetry.io/collector/receiver"

// Config defines configuration for the SignalFx receiver.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
receiver.SecureReceiverSettings `mapstructure:",squash"`
}
Loading