Skip to content

Commit

Permalink
Change signalfx receiver to the new interfaces (#747)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 18, 2020
1 parent 8443f8e commit 3a877ea
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func components() (component.Factories, error) {
receivers := []component.ReceiverFactoryBase{
&collectdreceiver.Factory{},
sapmreceiver.NewFactory(),
&signalfxreceiver.Factory{},
signalfxreceiver.NewFactory(),
&carbonreceiver.Factory{},
&wavefrontreceiver.Factory{},
redisreceiver.NewFactory(),
Expand Down
2 changes: 1 addition & 1 deletion receiver/signalfxreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
Expand Down
47 changes: 13 additions & 34 deletions receiver/signalfxreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"strconv"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

// This file implements factory for SignalFx receiver.
Expand All @@ -38,24 +37,15 @@ const (
defaultEndpoint = ":9943"
)

// Factory is the factory for SignalFx receiver.
type Factory struct {
// NewFactory creates a factory for SignalFx receiver.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithMetrics(createMetricsReceiver))
}

var _ component.ReceiverFactoryOld = (*Factory)(nil)

// Type gets the type of the Receiver config created by this factory.
func (f *Factory) Type() configmodels.Type {
return configmodels.Type(typeStr)
}

// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this config.
func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return nil
}

// CreateDefaultConfig creates the default configuration for SignalFx receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
Expand Down Expand Up @@ -93,23 +83,12 @@ func (rCfg *Config) validate() error {
return nil
}

// CreateTraceReceiver creates a trace receiver based on provided config.
func (f *Factory) CreateTraceReceiver(
ctx context.Context,
logger *zap.Logger,
cfg configmodels.Receiver,
consumer consumer.TraceConsumerOld,
) (component.TraceReceiver, error) {

return nil, configerror.ErrDataTypeIsNotSupported
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(
ctx context.Context,
logger *zap.Logger,
func createMetricsReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumerOld,
consumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {

rCfg := cfg.(*Config)
Expand All @@ -119,5 +98,5 @@ func (f *Factory) CreateMetricsReceiver(
return nil, err
}

return New(logger, *rCfg, consumer)
return New(params.Logger, *rCfg, consumer)
}
44 changes: 15 additions & 29 deletions receiver/signalfxreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,59 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

type mockMetricsConsumer struct {
}

var _ (consumer.MetricsConsumerOld) = (*mockMetricsConsumer)(nil)

func (m *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
return nil
}

func TestCreateReceiver(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "localhost:1" // Endpoint is required, not going to be used here.

tReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
tReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

tReceiver, err = factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

mReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestCreateInvalidHTTPEndpoint(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = ""

tReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
tReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.Error(t, err, "endpoint is not formatted correctly: missing port in address")
assert.Nil(t, tReceiver)
}

func TestCreateNoPort(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "localhost:"

tReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
tReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.Error(t, err, "endpoint port is not a number: strconv.ParseInt: parsing \"\": invalid syntax")
assert.Nil(t, tReceiver)
}

func TestCreateLargePort(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "localhost:65536"

tReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
tReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.Error(t, err, "port number must be between 1 and 65535")
assert.Nil(t, tReceiver)
}
10 changes: 6 additions & 4 deletions receiver/signalfxreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"
Expand Down Expand Up @@ -79,7 +81,7 @@ type sfxReceiver struct {
sync.Mutex
logger *zap.Logger
config *Config
nextConsumer consumer.MetricsConsumerOld
nextConsumer consumer.MetricsConsumer
server *http.Server

startOnce sync.Once
Expand All @@ -92,7 +94,7 @@ var _ component.MetricsReceiver = (*sfxReceiver)(nil)
func New(
logger *zap.Logger,
config Config,
nextConsumer consumer.MetricsConsumerOld,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {

if nextConsumer == nil {
Expand Down Expand Up @@ -216,7 +218,7 @@ func (r *sfxReceiver) handleReq(resp http.ResponseWriter, req *http.Request) {
return
}

md, _ := SignalFxV2ToMetricsData(r.logger, msg.Datapoints)
md, _ := signalFxV2ToMetricsData(r.logger, msg.Datapoints)

if r.config.AccessTokenPassthrough {
if accessToken := req.Header.Get(splunk.SFxAccessTokenHeader); accessToken != "" {
Expand All @@ -230,7 +232,7 @@ func (r *sfxReceiver) handleReq(resp http.ResponseWriter, req *http.Request) {
}
}

err = r.nextConsumer.ConsumeMetricsData(ctx, *md)
err = r.nextConsumer.ConsumeMetrics(ctx, pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md}))
obsreport.EndMetricsReceiveOp(
ctx,
typeStr,
Expand Down
44 changes: 24 additions & 20 deletions receiver/signalfxreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ import (
)

func Test_signalfxeceiver_New(t *testing.T) {
defaultConfig := (&Factory{}).CreateDefaultConfig().(*Config)
defaultConfig := createDefaultConfig().(*Config)
type args struct {
config Config
nextConsumer consumer.MetricsConsumerOld
nextConsumer consumer.MetricsConsumer
}
tests := []struct {
name string
Expand All @@ -74,7 +74,7 @@ func Test_signalfxeceiver_New(t *testing.T) {
name: "default_endpoint",
args: args{
config: *defaultConfig,
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
},
{
Expand All @@ -85,7 +85,7 @@ func Test_signalfxeceiver_New(t *testing.T) {
Endpoint: "localhost:1234",
},
},
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
},
}
Expand All @@ -105,9 +105,9 @@ func Test_signalfxeceiver_New(t *testing.T) {
func Test_signalfxeceiver_EndToEnd(t *testing.T) {
port := testutil.GetAvailablePort(t)
addr := fmt.Sprintf("localhost:%d", port)
cfg := (&Factory{}).CreateDefaultConfig().(*Config)
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = addr
sink := new(exportertest.SinkMetricsExporterOld)
sink := new(exportertest.SinkMetricsExporter)
r, err := New(zap.NewNop(), *cfg, sink)
require.NoError(t, err)

Expand Down Expand Up @@ -159,16 +159,18 @@ func Test_signalfxeceiver_EndToEnd(t *testing.T) {
}
}

got := sink.AllMetrics()
require.Equal(t, 1, len(got))
mds := sink.AllMetrics()
require.Len(t, mds, 1)
got := pdatautil.MetricsToMetricsData(mds[0])
require.Len(t, got, 1)
assert.Equal(t, want, got[0])

assert.NoError(t, r.Shutdown(context.Background()))
assert.Equal(t, componenterror.ErrAlreadyStopped, r.Shutdown(context.Background()))
}

func Test_sfxReceiver_handleReq(t *testing.T) {
config := (&Factory{}).CreateDefaultConfig().(*Config)
config := createDefaultConfig().(*Config)
config.Endpoint = "localhost:0" // Actually not creating the endpoint

currentTime := time.Now().Unix() * 1e3
Expand Down Expand Up @@ -305,8 +307,7 @@ func Test_sfxReceiver_handleReq(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(exportertest.SinkMetricsExporterOld)
rcv, err := New(zap.NewNop(), *config, sink)
rcv, err := New(zap.NewNop(), *config, exportertest.NewNopMetricsExporter())
assert.NoError(t, err)

r := rcv.(*sfxReceiver)
Expand All @@ -327,15 +328,15 @@ func Test_sfxReceiver_handleReq(t *testing.T) {

func Test_sfxReceiver_TLS(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
cfg := (&Factory{}).CreateDefaultConfig().(*Config)
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = addr
cfg.HTTPServerSettings.TLSSetting = &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "./testdata/testcert.crt",
KeyFile: "./testdata/testkey.key",
},
}
sink := new(exportertest.SinkMetricsExporterOld)
sink := new(exportertest.SinkMetricsExporter)
r, err := New(zap.NewNop(), *cfg, sink)
require.NoError(t, err)
defer r.Shutdown(context.Background())
Expand Down Expand Up @@ -421,8 +422,10 @@ func Test_sfxReceiver_TLS(t *testing.T) {
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
t.Log("SignalFx Request Received")

got := sink.AllMetrics()
require.Equal(t, 1, len(got))
mds := sink.AllMetrics()
require.Len(t, mds, 1)
got := pdatautil.MetricsToMetricsData(mds[0])
require.Len(t, got, 1)
assert.Equal(t, want, got[0])
}

Expand Down Expand Up @@ -456,11 +459,11 @@ func Test_sfxReceiver_AccessTokenPassthrough(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := (&Factory{}).CreateDefaultConfig().(*Config)
config := createDefaultConfig().(*Config)
config.Endpoint = "localhost:0"
config.AccessTokenPassthrough = tt.passthrough

sink := new(exportertest.SinkMetricsExporterOld)
sink := new(exportertest.SinkMetricsExporter)
rcv, err := New(zap.NewNop(), *config, sink)
assert.NoError(t, err)

Expand All @@ -487,9 +490,10 @@ func Test_sfxReceiver_AccessTokenPassthrough(t *testing.T) {
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
assert.Equal(t, responseOK, bodyStr)

got := sink.AllMetrics()
require.Equal(t, 1, len(got))

mds := sink.AllMetrics()
require.Len(t, mds, 1)
got := pdatautil.MetricsToMetricsData(mds[0])
require.Len(t, got, 1)
tokenLabel := ""
if got[0].Resource != nil && got[0].Resource.Labels != nil {
tokenLabel = got[0].Resource.Labels["com.splunk.signalfx.access_token"]
Expand Down
8 changes: 4 additions & 4 deletions receiver/signalfxreceiver/signalfxv2_to_metricdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ var (
errSFxNoDatumValue = errors.New("no datum value present for data-point")
)

// SignalFxV2ToMetricsData converts SignalFx proto data points to
// signalFxV2ToMetricsData converts SignalFx proto data points to
// consumerdata.MetricsData. Returning the converted data and the number of
// dropped time series.
func SignalFxV2ToMetricsData(
func signalFxV2ToMetricsData(
logger *zap.Logger,
sfxDataPoints []*sfxpb.DataPoint,
) (*consumerdata.MetricsData, int) {
) (consumerdata.MetricsData, int) {

// TODO: not optimized at all, basically regenerating everything for each
// data point.
numDroppedTimeSeries := 0
md := &consumerdata.MetricsData{}
md := consumerdata.MetricsData{}
metrics := make([]*metricspb.Metric, 0, len(sfxDataPoints))
for _, sfxDataPoint := range sfxDataPoints {
if sfxDataPoint == nil {
Expand Down
Loading

0 comments on commit 3a877ea

Please sign in to comment.