Skip to content

Commit

Permalink
Change collectd receiver to the new interfaces (#748)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 18, 2020
1 parent 3a877ea commit 41311cb
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 126 deletions.
2 changes: 1 addition & 1 deletion cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func components() (component.Factories, error) {
}

receivers := []component.ReceiverFactoryBase{
&collectdreceiver.Factory{},
collectdreceiver.NewFactory(),
sapmreceiver.NewFactory(),
signalfxreceiver.NewFactory(),
&carbonreceiver.Factory{},
Expand Down
2 changes: 1 addition & 1 deletion receiver/collectdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Receivers[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), factories,
Expand Down
48 changes: 13 additions & 35 deletions receiver/collectdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

// This file implements factory for CollectD receiver.
Expand All @@ -37,24 +36,14 @@ const (
defaultEncodingFormat = "json"
)

// Factory is the factory for collectd receiver.
type Factory struct {
// NewFactory creates a factory for collectd receiver.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithMetrics(createMetricsReceiver))
}

var _ component.ReceiverFactoryOld = &Factory{}

// Type gets the type of the Receiver config created by this factory.
func (f *Factory) Type() configmodels.Type {
return configmodels.Type(typeStr)
}

// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this config.
func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return nil
}

// CreateDefaultConfig creates the default configuration for CollectD receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: configmodels.Type(typeStr),
Expand All @@ -68,22 +57,11 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
}
}

// CreateTraceReceiver creates a trace receiver based on provided config.
func (f *Factory) CreateTraceReceiver(
ctx context.Context,
logger *zap.Logger,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumerOld,
) (component.TraceReceiver, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(
ctx context.Context,
logger *zap.Logger,
func createMetricsReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumerOld,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
c := cfg.(*Config)
c.Encoding = strings.ToLower(c.Encoding)
Expand All @@ -95,5 +73,5 @@ func (f *Factory) CreateMetricsReceiver(
c.Encoding,
)
}
return New(logger, c.Endpoint, c.Timeout, c.AttributesPrefix, nextConsumer)
return newCollectdReceiver(params.Logger, c.Endpoint, c.Timeout, c.AttributesPrefix, nextConsumer)
}
28 changes: 7 additions & 21 deletions receiver/collectdreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,25 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

type mockMetricsConsumer struct {
}

func (m *mockMetricsConsumer) ConsumeMetricsData(ctx context.Context, td consumerdata.MetricsData) error {
return nil
}

func TestCreateReceiver(t *testing.T) {
factory := &Factory{}
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

tReceiver, err := factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
assert.Nil(t, err, "receiver creation failed")
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
tReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.NoError(t, err)
assert.NotNil(t, tReceiver, "receiver creation failed")

tReceiver, err = factory.CreateMetricsReceiver(context.Background(), zap.NewNop(), cfg, &mockMetricsConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

mReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}
1 change: 1 addition & 0 deletions receiver/collectdreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ require (
go.opentelemetry.io/collector v0.8.1-0.20200818003535-ed65294c9886
go.uber.org/zap v1.15.0
google.golang.org/grpc/examples v0.0.0-20200728194956-1c32b02682df // indirect
google.golang.org/protobuf v1.25.0
)
11 changes: 6 additions & 5 deletions receiver/collectdreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.uber.org/zap"
)

Expand All @@ -46,19 +47,19 @@ type collectdReceiver struct {
addr string
server *http.Server
defaultAttrsPrefix string
nextConsumer consumer.MetricsConsumerOld
nextConsumer consumer.MetricsConsumer

startOnce sync.Once
stopOnce sync.Once
}

// New creates the CollectD receiver with the given parameters.
func New(
// newCollectdReceiver creates the CollectD receiver with the given parameters.
func newCollectdReceiver(
logger *zap.Logger,
addr string,
timeout time.Duration,
defaultAttrsPrefix string,
nextConsumer consumer.MetricsConsumerOld) (component.MetricsReceiver, error) {
nextConsumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {
if nextConsumer == nil {
return nil, errNilNextConsumer
}
Expand Down Expand Up @@ -144,7 +145,7 @@ func (cdr *collectdReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

err = cdr.nextConsumer.ConsumeMetricsData(ctx, md)
err = cdr.nextConsumer.ConsumeMetrics(ctx, pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md}))
if err != nil {
cdr.handleHTTPErr(w, err, "unable to process metrics")
return
Expand Down
84 changes: 21 additions & 63 deletions receiver/collectdreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"net/http"
"sync"
"testing"
"time"

Expand All @@ -29,8 +28,11 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/testutil"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type metricLabel struct {
Expand All @@ -43,7 +45,7 @@ func TestNewReceiver(t *testing.T) {
addr string
timeout time.Duration
attrsPrefix string
nextConsumer consumer.MetricsConsumerOld
nextConsumer consumer.MetricsConsumer
}
tests := []struct {
name string
Expand All @@ -65,16 +67,16 @@ func TestNewReceiver(t *testing.T) {
addr: ":0",
timeout: defaultTimeout,
attrsPrefix: "default_attr_",
nextConsumer: exportertest.NewNopMetricsExporterOld(),
nextConsumer: exportertest.NewNopMetricsExporter(),
},
},
}
logger := zap.NewNop()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := New(logger, tt.args.addr, time.Second*10, "", tt.args.nextConsumer)
_, err := newCollectdReceiver(logger, tt.args.addr, time.Second*10, "", tt.args.nextConsumer)
if err != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("newCollectdReceiver() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
Expand Down Expand Up @@ -152,10 +154,10 @@ func TestCollectDServer(t *testing.T) {
wantData: []consumerdata.MetricsData{},
}}

sink := newMockMetricsSink(1)
sink := new(exportertest.SinkMetricsExporter)

logger := zap.NewNop()
cdr, err := New(logger, endpoint, defaultTimeout, defaultAttrsPrefix, sink)
cdr, err := newCollectdReceiver(logger, endpoint, defaultTimeout, defaultAttrsPrefix, sink)
if err != nil {
t.Fatalf("Failed to create receiver: %v", err)
}
Expand All @@ -172,8 +174,7 @@ func TestCollectDServer(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {

sink.receivedData = []consumerdata.MetricsData{}
sink.Reset()
req, err := http.NewRequest(
"POST",
"http:https://"+endpoint+"?"+tt.queryParams,
Expand All @@ -191,56 +192,17 @@ func TestCollectDServer(t *testing.T) {
return
}

done := make(chan struct{})
go func() {
sink.Wait()
close(done)
}()

select {
case <-done:
case <-time.After(1 * time.Second):
t.Errorf("timeout: sink did not receive data")
}
assertMetricsDataAreEqual(t, sink.receivedData, tt.wantData)
testutil.WaitFor(t, func() bool {
return len(sink.AllMetrics()) == 1
})
mds := sink.AllMetrics()
require.Len(t, mds, 1)
got := pdatautil.MetricsToMetricsData(mds[0])
assertMetricsDataAreEqual(t, got, tt.wantData)
})
}
}

type mockMetricsSink struct {
wg *sync.WaitGroup
queue chan consumerdata.MetricsData
receivedData []consumerdata.MetricsData
}

func newMockMetricsSink(numReceiveTraceDataCount int) *mockMetricsSink {
wg := &sync.WaitGroup{}
wg.Add(numReceiveTraceDataCount)

sink := &mockMetricsSink{
wg: wg,
queue: make(chan consumerdata.MetricsData),
receivedData: make([]consumerdata.MetricsData, 0, numReceiveTraceDataCount),
}
go func() {
md := <-sink.queue
sink.receivedData = append(sink.receivedData, md)
sink.wg.Done()
}()
return sink
}

var _ consumer.MetricsConsumerOld = (*mockMetricsSink)(nil)

func (m *mockMetricsSink) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
m.queue <- md
return nil
}

func (m *mockMetricsSink) Wait() {
m.wg.Wait()
}

func assertMetricsDataAreEqual(t *testing.T, metricsData1, metricsData2 []consumerdata.MetricsData) {
if len(metricsData1) != len(metricsData2) {
t.Errorf("metrics data length mismatch. got:\n%d\nwant:\n%d\n", len(metricsData1), len(metricsData2))
Expand All @@ -250,15 +212,14 @@ func assertMetricsDataAreEqual(t *testing.T, metricsData1, metricsData2 []consum
for i := 0; i < len(metricsData1); i++ {
md1, md2 := metricsData1[i], metricsData2[i]

if !assert.ObjectsAreEqual(md1.Node, md2.Node) {
if !proto.Equal(md1.Node, md2.Node) {
t.Errorf("metrics data nodes are not equal. got:\n%+v\nwant:\n%+v\n", md1.Node, md2.Node)
}
if !assert.ObjectsAreEqual(md1.Resource, md2.Resource) {
if !proto.Equal(md1.Resource, md2.Resource) {
t.Errorf("metrics data resources are not equal. got:\n%+v\nwant:\n%+v\n", md1.Resource, md2.Resource)
}

assertMetricsAreEqual(t, md1.Metrics, md2.Metrics)

}
}

Expand All @@ -271,7 +232,7 @@ func assertMetricsAreEqual(t *testing.T, metrics1, metrics2 []*metricspb.Metric)
for i := 0; i < len(metrics1); i++ {
m1, m2 := metrics1[i], metrics2[i]

if !assert.ObjectsAreEqual(m1.Resource, m2.Resource) {
if !proto.Equal(m1.Resource, m2.Resource) {
t.Errorf("metric resources are not equal. got:\n%+v\nwant:\n%+v\n", m1.Resource, m2.Resource)
}

Expand Down Expand Up @@ -305,10 +266,7 @@ func assertMetricsAreEqual(t *testing.T, metrics1, metrics2 []*metricspb.Metric)
t.Errorf("labels length mismatch. got:\n%d\nwant:\n%d\n", len(l1), len(l2))
return
}
if !assert.ObjectsAreEqual(l1, l2) {
t.Errorf("metric labels are not equal. got:\n%+v\nwant:\n%+v\n", l1, l2)
}

assert.EqualValues(t, l1, l2)
}
}
}
Expand Down

0 comments on commit 41311cb

Please sign in to comment.