Skip to content

Commit

Permalink
Update carbon and wavefront to the new interface (#744)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 18, 2020
1 parent 4c56271 commit 660bbfb
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 215 deletions.
4 changes: 2 additions & 2 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func components() (component.Factories, error) {
collectdreceiver.NewFactory(),
sapmreceiver.NewFactory(),
signalfxreceiver.NewFactory(),
&carbonreceiver.Factory{},
&wavefrontreceiver.Factory{},
carbonreceiver.NewFactory(),
wavefrontreceiver.NewFactory(),
redisreceiver.NewFactory(),
kubeletstatsreceiver.NewFactory(),
simpleprometheusreceiver.NewFactory(),
Expand Down
2 changes: 1 addition & 1 deletion receiver/carbonreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
Expand Down
102 changes: 41 additions & 61 deletions receiver/carbonreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import (

"github.com/spf13/viper"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"go.opentelemetry.io/collector/receiver/receiverhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport"
Expand All @@ -37,57 +36,50 @@ const (
typeStr = "carbon"
)

// Factory is the factory for carbon receiver.
type Factory struct {
// NewFactory creates a factory for Carbon receiver.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithCustomUnmarshaler(customUnmarshaler),
receiverhelper.WithMetrics(createMetricsReceiver))
}

var _ component.ReceiverFactoryOld = (*Factory)(nil)

// Type gets the type of the Receiver config created by this factory.
func (f *Factory) Type() configmodels.Type {
return configmodels.Type(typeStr)
}

// CustomUnmarshaler returns the custom function to handle the special settings
// used on the receiver.
func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return func(sourceViperSection *viper.Viper, intoCfg interface{}) error {
if sourceViperSection == nil {
// The section is empty nothing to do, using the default config.
return nil
}

// Unmarshal but not exact yet so the different keys under config do not
// trigger errors, this is needed so that the types of protocol and transport
// are read.
if err := sourceViperSection.Unmarshal(intoCfg); err != nil {
return err
}
func customUnmarshaler(sourceViperSection *viper.Viper, intoCfg interface{}) error {
if sourceViperSection == nil {
// The section is empty nothing to do, using the default config.
return nil
}

// Unmarshal the protocol, so the type of config can be properly set.
rCfg := intoCfg.(*Config)
vParserCfg := sourceViperSection.Sub(parserConfigSection)
if vParserCfg != nil {
if err := protocol.LoadParserConfig(vParserCfg, rCfg.Parser); err != nil {
return fmt.Errorf(
"error on %q section for %s: %v",
parserConfigSection,
rCfg.Name(),
err)
}
}
// Unmarshal but not exact yet so the different keys under config do not
// trigger errors, this is needed so that the types of protocol and transport
// are read.
if err := sourceViperSection.Unmarshal(intoCfg); err != nil {
return err
}

// Unmarshal exact to validate the config keys.
if err := sourceViperSection.UnmarshalExact(intoCfg); err != nil {
return err
// Unmarshal the protocol, so the type of config can be properly set.
rCfg := intoCfg.(*Config)
vParserCfg := sourceViperSection.Sub(parserConfigSection)
if vParserCfg != nil {
if err := protocol.LoadParserConfig(vParserCfg, rCfg.Parser); err != nil {
return fmt.Errorf(
"error on %q section for %s: %v",
parserConfigSection,
rCfg.Name(),
err)
}
}

return nil
// Unmarshal exact to validate the config keys.
if err := sourceViperSection.UnmarshalExact(intoCfg); err != nil {
return err
}

return nil
}

// CreateDefaultConfig creates the default configuration for Carbon receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: configmodels.Type(typeStr),
Expand All @@ -105,25 +97,13 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
}
}

// CreateTraceReceiver creates a trace receiver based on provided config.
func (f *Factory) CreateTraceReceiver(
ctx context.Context,
logger *zap.Logger,
cfg configmodels.Receiver,
consumer consumer.TraceConsumerOld,
) (component.TraceReceiver, error) {

return nil, configerror.ErrDataTypeIsNotSupported
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(
ctx context.Context,
logger *zap.Logger,
func createMetricsReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumerOld,
consumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {

rCfg := cfg.(*Config)
return New(logger, *rCfg, consumer)
return New(params.Logger, *rCfg, consumer)
}
32 changes: 7 additions & 25 deletions receiver/carbonreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,25 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

type mockMetricsConsumer struct {
}

var _ (consumer.MetricsConsumerOld) = (*mockMetricsConsumer)(nil)

func (m *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
return nil
}

func TestCreateReceiver(t *testing.T) {
factory := &Factory{}
cfg := factory.CreateDefaultConfig().(*Config)
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:0" // Endpoint is required, not going to be used here.

tReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

tReceiver, err = factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
assert.Nil(t, err, "receiver creation failed")
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
tReceiver, err := createMetricsReceiver(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.NoError(t, err)
assert.NotNil(t, tReceiver, "receiver creation failed")

mReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}
4 changes: 2 additions & 2 deletions receiver/carbonreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type carbonReceiver struct {
server transport.Server
reporter transport.Reporter
parser protocol.Parser
nextConsumer consumer.MetricsConsumerOld
nextConsumer consumer.MetricsConsumer

startOnce sync.Once
stopOnce sync.Once
Expand All @@ -57,7 +57,7 @@ var _ component.MetricsReceiver = (*carbonReceiver)(nil)
func New(
logger *zap.Logger,
config Config,
nextConsumer consumer.MetricsConsumerOld,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {

if nextConsumer == nil {
Expand Down
31 changes: 17 additions & 14 deletions receiver/carbonreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/testutil"
"go.uber.org/zap"
Expand All @@ -40,10 +41,10 @@ import (
)

func Test_carbonreceiver_New(t *testing.T) {
defaultConfig := (&Factory{}).CreateDefaultConfig().(*Config)
defaultConfig := createDefaultConfig().(*Config)
type args struct {
config Config
nextConsumer consumer.MetricsConsumerOld
nextConsumer consumer.MetricsConsumer
}
tests := []struct {
name string
Expand All @@ -54,7 +55,7 @@ func Test_carbonreceiver_New(t *testing.T) {
name: "default_config",
args: args{
config: *defaultConfig,
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
},
{
Expand All @@ -68,7 +69,7 @@ func Test_carbonreceiver_New(t *testing.T) {
},
TCPIdleTimeout: defaultConfig.TCPIdleTimeout,
},
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
},
{
Expand All @@ -84,7 +85,7 @@ func Test_carbonreceiver_New(t *testing.T) {
config: Config{
ReceiverSettings: configmodels.ReceiverSettings{},
},
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
wantErr: errEmptyEndpoint,
},
Expand All @@ -104,7 +105,7 @@ func Test_carbonreceiver_New(t *testing.T) {
Config: &protocol.PlaintextConfig{},
},
},
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
wantErr: errors.New("unsupported transport \"unknown_transp\" for receiver \"invalid_transport_rcv\""),
},
Expand All @@ -130,7 +131,7 @@ func Test_carbonreceiver_New(t *testing.T) {
},
},
},
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
},
{
Expand All @@ -150,7 +151,7 @@ func Test_carbonreceiver_New(t *testing.T) {
Config: &protocol.PlaintextConfig{},
},
},
nextConsumer: new(exportertest.SinkMetricsExporterOld),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
wantErr: errors.New("invalid idle timeout: -1s"),
},
Expand Down Expand Up @@ -184,7 +185,7 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
{
name: "default_config",
configFn: func() *Config {
return (&Factory{}).CreateDefaultConfig().(*Config)
return createDefaultConfig().(*Config)
},
clientFn: func(t *testing.T) *client.Graphite {
c, err := client.NewGraphite(client.TCP, host, port)
Expand All @@ -195,7 +196,7 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
{
name: "default_config_udp",
configFn: func() *Config {
cfg := (&Factory{}).CreateDefaultConfig().(*Config)
cfg := createDefaultConfig().(*Config)
cfg.Transport = "udp"
return cfg
},
Expand All @@ -210,7 +211,7 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
cfg := tt.configFn()
cfg.Endpoint = addr
sink := new(exportertest.SinkMetricsExporterOld)
sink := new(exportertest.SinkMetricsExporter)
rcv, err := New(zap.NewNop(), *cfg, sink)
require.NoError(t, err)
r := rcv.(*carbonReceiver)
Expand All @@ -237,9 +238,11 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
mr.WaitAllOnMetricsProcessedCalls()

mdd := sink.AllMetrics()
require.Equal(t, 1, len(mdd))
require.Equal(t, 1, len(mdd[0].Metrics))
metric := mdd[0].Metrics[0]
require.Len(t, mdd, 1)
ocmd := pdatautil.MetricsToMetricsData(mdd[0])
require.Len(t, ocmd, 1)
require.Len(t, ocmd[0].Metrics, 1)
metric := ocmd[0].Metrics[0]
assert.Equal(t, carbonMetric.Name, metric.GetMetricDescriptor().GetName())
tss := metric.GetTimeseries()
require.Equal(t, 1, len(tss))
Expand Down
2 changes: 1 addition & 1 deletion receiver/carbonreceiver/transport/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Server interface {
// the Parser and passed to the next consumer.
ListenAndServe(
p protocol.Parser,
mc consumer.MetricsConsumerOld,
mc consumer.MetricsConsumer,
r Reporter,
) error

Expand Down
Loading

0 comments on commit 660bbfb

Please sign in to comment.