Skip to content

Commit

Permalink
Update carbon exporter to the new interface (#761)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 18, 2020
1 parent 14fe86a commit fcf95eb
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 223 deletions.
2 changes: 1 addition & 1 deletion cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func components() (component.Factories, error) {
sapmexporter.NewFactory(),
kinesisexporter.NewFactory(),
awsxrayexporter.NewFactory(),
&carbonexporter.Factory{},
carbonexporter.NewFactory(),
&honeycombexporter.Factory{},
&jaegerthrifthttpexporter.Factory{},
&lightstepexporter.Factory{},
Expand Down
32 changes: 0 additions & 32 deletions exporter/carbonexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,35 +39,3 @@ type Config struct {
// The default value is defined by the DefaultSendTimeout constant.
Timeout time.Duration `mapstructure:"timeout"`
}

// convenience function so the default can be created without instantiating the
// factory.
func defaultConfig() *Config {
return &Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
Endpoint: DefaultEndpoint,
Timeout: DefaultSendTimeout,
}
}

// ensures that all defaults are set in the case of zero values.
func setDefaults(cfg Config) Config {
defaultCfg := defaultConfig()

if cfg.ExporterSettings.TypeVal == "" {
cfg.ExporterSettings.TypeVal = defaultCfg.ExporterSettings.TypeVal
}
if cfg.ExporterSettings.NameVal == "" {
cfg.ExporterSettings.NameVal = defaultCfg.ExporterSettings.NameVal
}
if cfg.Endpoint == "" {
cfg.Endpoint = defaultCfg.Endpoint
}
if cfg.Timeout == 0 {
cfg.Timeout = defaultCfg.Timeout
}
return cfg
}
14 changes: 4 additions & 10 deletions exporter/carbonexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package carbonexporter

import (
"context"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
Expand All @@ -31,7 +33,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.Nil(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Exporters[configmodels.Type(typeStr)] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

Expand All @@ -42,7 +44,6 @@ func TestLoadConfig(t *testing.T) {

defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, defaultCfg, e0)
assert.Equal(t, defaultCfg, defaultConfig())

expectedName := "carbon/allsettings"

Expand All @@ -57,14 +58,7 @@ func TestLoadConfig(t *testing.T) {
}
assert.Equal(t, &expectedCfg, e1)

te, err := factory.CreateMetricsExporter(zap.NewNop(), e1)
te, err := factory.CreateMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, e1)
require.NoError(t, err)
require.NotNil(t, te)
}

func Test_setDefaults(t *testing.T) {
// Zero-value Config must match default config createcd via factory.
cfg := setDefaults(Config{})
factoryDefaultCfg := (&Factory{}).CreateDefaultConfig()
assert.Equal(t, factoryDefaultCfg, &cfg)
}
35 changes: 13 additions & 22 deletions exporter/carbonexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,30 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// New returns a new Carbon exporter.
func New(config Config) (component.MetricsExporterOld, error) {
effectiveConfig := setDefaults(config)

// newCarbonExporter returns a new Carbon exporter.
func newCarbonExporter(cfg *Config) (component.MetricsExporter, error) {
// Resolve TCP address just to ensure that it is a valid one. It is better
// to fail here than at when the exporter is started.
if _, err := net.ResolveTCPAddr("tcp", effectiveConfig.Endpoint); err != nil {
return nil, fmt.Errorf(
"%q exporter has an invalid TCP endpoint: %v",
effectiveConfig.Name(),
err)
if _, err := net.ResolveTCPAddr("tcp", cfg.Endpoint); err != nil {
return nil, fmt.Errorf("%q exporter has an invalid TCP endpoint: %w", cfg.Name(), err)
}

// Negative timeouts are not acceptable, since all sends will fail.
if effectiveConfig.Timeout < 0 {
return nil, fmt.Errorf(
"%q exporter requires a positive timeout",
effectiveConfig.Name())
if cfg.Timeout < 0 {
return nil, fmt.Errorf("%q exporter requires a positive timeout", cfg.Name())
}

sender := carbonSender{
connPool: newTCPConnPool(effectiveConfig.Endpoint, effectiveConfig.Timeout),
connPool: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
}

return exporterhelper.NewMetricsExporterOld(
&effectiveConfig.ExporterSettings,
return exporterhelper.NewMetricsExporter(
&cfg.ExporterSettings,
sender.pushMetricsData,
exporterhelper.WithShutdown(sender.Shutdown))
}
Expand All @@ -63,11 +57,8 @@ type carbonSender struct {
connPool *connPool
}

func (cs *carbonSender) pushMetricsData(
ctx context.Context,
md consumerdata.MetricsData,
) (int, error) {
lines, converted, dropped := metricDataToPlaintext(md)
func (cs *carbonSender) pushMetricsData(_ context.Context, md pdata.Metrics) (int, error) {
lines, converted, dropped := metricDataToPlaintext(pdatautil.MetricsToMetricsData(md))

if _, err := cs.connPool.Write([]byte(lines)); err != nil {
// Use the sum of converted and dropped since the write failed for all.
Expand Down
63 changes: 34 additions & 29 deletions exporter/carbonexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,40 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/testutil"
"go.opentelemetry.io/collector/testutil/metricstestutil"
)

func TestNew(t *testing.T) {
tests := []struct {
name string
config Config
config *Config
wantErr bool
}{
{
name: "zero_value_config",
name: "default_config",
config: createDefaultConfig().(*Config),
},
{
name: "invalid_tcp_addr",
config: Config{
config: &Config{
Endpoint: "http:https://localhost:2003",
},
wantErr: true,
},
{
name: "invalid_timeout",
config: Config{
config: &Config{
Timeout: -5 * time.Second,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(tt.config)
got, err := newCarbonExporter(tt.config)
if tt.wantErr {
assert.Nil(t, got)
assert.Error(t, err)
Expand All @@ -80,23 +82,25 @@ func TestNew(t *testing.T) {
func TestConsumeMetricsData(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

smallBatch := consumerdata.MetricsData{
Metrics: []*metricspb.Metric{
metricstestutil.Gauge(
"test_gauge",
[]string{"k0", "k1"},
metricstestutil.Timeseries(
time.Now(),
[]string{"v0", "v1"},
metricstestutil.Double(time.Now(), 123))),
smallBatch := pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{
{
Metrics: []*metricspb.Metric{
metricstestutil.Gauge(
"test_gauge",
[]string{"k0", "k1"},
metricstestutil.Timeseries(
time.Now(),
[]string{"v0", "v1"},
metricstestutil.Double(time.Now(), 123))),
},
},
}
})

largeBatch := generateLargeBatch(t)
largeBatch := generateLargeBatch()

tests := []struct {
name string
md consumerdata.MetricsData
md pdata.Metrics
acceptClient bool
createServer bool
}{
Expand Down Expand Up @@ -144,14 +148,14 @@ func TestConsumeMetricsData(t *testing.T) {
defer ln.Close()
}

config := Config{Endpoint: addr, Timeout: 500 * time.Millisecond}
exp, err := New(config)
config := &Config{Endpoint: addr, Timeout: 500 * time.Millisecond}
exp, err := newCarbonExporter(config)
require.NoError(t, err)

require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))

if !tt.createServer {
require.Error(t, exp.ConsumeMetricsData(context.Background(), tt.md))
require.Error(t, exp.ConsumeMetrics(context.Background(), tt.md))
assert.NoError(t, exp.Shutdown(context.Background()))
return
}
Expand All @@ -161,15 +165,16 @@ func TestConsumeMetricsData(t *testing.T) {
// call to ConsumeMetricsData below will produce error or not.
// See comment about recvfrom at connPool.Write for detailed
// information.
exp.ConsumeMetricsData(context.Background(), tt.md)
exp.ConsumeMetrics(context.Background(), tt.md)
assert.NoError(t, exp.Shutdown(context.Background()))
return
}

// Each time series will generate one Carbon line, set up the wait
// Each metric point will generate one Carbon line, set up the wait
// for all of them.
var wg sync.WaitGroup
wg.Add(exporterhelper.NumTimeSeries(tt.md))
_, mpc := pdatautil.MetricAndDataPointCount(tt.md)
wg.Add(mpc)
go func() {
ln.SetDeadline(time.Now().Add(time.Second))
conn, err := ln.AcceptTCP()
Expand All @@ -192,7 +197,7 @@ func TestConsumeMetricsData(t *testing.T) {
}
}()

require.NoError(t, exp.ConsumeMetricsData(context.Background(), tt.md))
require.NoError(t, exp.ConsumeMetrics(context.Background(), tt.md))
assert.NoError(t, exp.Shutdown(context.Background()))

wg.Wait()
Expand All @@ -215,7 +220,7 @@ func Test_connPool_Concurrency(t *testing.T) {
cp := newTCPConnPool(addr, 500*time.Millisecond)
sender := carbonSender{connPool: cp}
ctx := context.Background()
md := generateLargeBatch(t)
md := generateLargeBatch()
concurrentWriters := 3
writesPerRoutine := 3

Expand All @@ -225,7 +230,7 @@ func Test_connPool_Concurrency(t *testing.T) {
}(&doneFlag)

var recvWG sync.WaitGroup
recvWG.Add(concurrentWriters * writesPerRoutine * len(md.Metrics))
recvWG.Add(concurrentWriters * writesPerRoutine * pdatautil.MetricCount(md))
go func() {
for {
conn, err := ln.AcceptTCP()
Expand Down Expand Up @@ -275,7 +280,7 @@ func Test_connPool_Concurrency(t *testing.T) {
recvWG.Wait()
}

func generateLargeBatch(t *testing.T) consumerdata.MetricsData {
func generateLargeBatch() pdata.Metrics {
md := consumerdata.MetricsData{
Node: &commonpb.Node{
ServiceInfo: &commonpb.ServiceInfo{Name: "test_carbon"},
Expand All @@ -301,5 +306,5 @@ func generateLargeBatch(t *testing.T) consumerdata.MetricsData {
)
}

return md
return pdatautil.MetricsFromMetricsData([]consumerdata.MetricsData{md})
}
51 changes: 23 additions & 28 deletions exporter/carbonexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,43 @@
package carbonexporter

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.uber.org/zap"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const (
// The value of "type" key in configuration.
typeStr = "carbon"
)

// Factory is the factory for Carbon exporter.
type Factory struct {
}

// Type gets the type of the Exporter config created by this factory.
func (f *Factory) Type() configmodels.Type {
return configmodels.Type(typeStr)
}

// CreateDefaultConfig creates the default configuration for exporter.
func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
return defaultConfig()
// NewFactory creates a factory for Carbon exporter.
func NewFactory() component.ExporterFactory {
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithMetrics(createMetricsExporter))
}

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(
logger *zap.Logger,
config configmodels.Exporter,
) (component.TraceExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
func createDefaultConfig() configmodels.Exporter {
return &Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
Endpoint: DefaultEndpoint,
Timeout: DefaultSendTimeout,
}
}

// CreateMetricsExporter creates a metrics exporter based on this config.
func (f *Factory) CreateMetricsExporter(
logger *zap.Logger,
func createMetricsExporter(
_ context.Context,
_ component.ExporterCreateParams,
config configmodels.Exporter,
) (component.MetricsExporterOld, error) {

expCfg := config.(*Config)

exp, err := New(*expCfg)
) (component.MetricsExporter, error) {
exp, err := newCarbonExporter(config.(*Config))

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit fcf95eb

Please sign in to comment.