From 4b47a0ff0d7f555f699febcf4850f009ebc45e53 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 14 Dec 2023 11:24:13 -0800 Subject: [PATCH] [chore] fix resourcetotelemetry usage in carbonexporter (#29888) In https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/29879 I forgot to actually plugin the logic, only the config was added there. Added a test to confirm it. Signed-off-by: Bogdan Drutu --- exporter/carbonexporter/exporter.go | 9 +++- exporter/carbonexporter/exporter_test.go | 64 ++++++++++++++++-------- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/exporter/carbonexporter/exporter.go b/exporter/carbonexporter/exporter.go index 2b18fdd71461b..586faf279f90b 100644 --- a/exporter/carbonexporter/exporter.go +++ b/exporter/carbonexporter/exporter.go @@ -12,6 +12,8 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) // newCarbonExporter returns a new Carbon exporter. @@ -20,7 +22,7 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri connPool: newTCPConnPool(cfg.Endpoint, cfg.Timeout), } - return exporterhelper.NewMetricsExporter( + exp, err := exporterhelper.NewMetricsExporter( context.TODO(), set, cfg, @@ -29,6 +31,11 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri exporterhelper.WithQueue(cfg.QueueConfig), exporterhelper.WithRetry(cfg.RetryConfig), exporterhelper.WithShutdown(sender.Shutdown)) + if err != nil { + return nil, err + } + + return resourcetotelemetry.WrapMetricsExporter(cfg.ResourceToTelemetryConfig, exp), nil } // carbonSender is the struct tying the translation function and the TCP diff --git a/exporter/carbonexporter/exporter_test.go b/exporter/carbonexporter/exporter_test.go index 8101e4717527d..37c321c84be9a 100644 --- a/exporter/carbonexporter/exporter_test.go +++ b/exporter/carbonexporter/exporter_test.go @@ -27,6 +27,7 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.9.0" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" ) func TestNewWithDefaultConfig(t *testing.T) { @@ -45,10 +46,31 @@ func TestConsumeMetricsNoServer(t *testing.T) { exportertest.NewNopCreateSettings()) require.NoError(t, err) require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost())) - require.Error(t, exp.ConsumeMetrics(context.Background(), generateLargeBatch())) + require.Error(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch())) require.NoError(t, exp.Shutdown(context.Background())) } +func TestConsumeMetricsWithResourceToTelemetry(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + cs := newCarbonServer(t, addr, "test_0;k0=v0;k1=v1;service.name=test_carbon 0") + // Each metric point will generate one Carbon line, set up the wait + // for all of them. + cs.start(t, 1) + + exp, err := newCarbonExporter( + &Config{ + TCPAddr: confignet.TCPAddr{Endpoint: addr}, + TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second}, + ResourceToTelemetryConfig: resourcetotelemetry.Settings{Enabled: true}, + }, + exportertest.NewNopCreateSettings()) + require.NoError(t, err) + require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch())) + assert.NoError(t, exp.Shutdown(context.Background())) + cs.shutdownAndVerify(t) +} + func TestConsumeMetrics(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10147") @@ -94,7 +116,7 @@ func TestConsumeMetrics(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - cs := newCarbonServer(t, addr) + cs := newCarbonServer(t, addr, "") // Each metric point will generate one Carbon line, set up the wait // for all of them. cs.start(t, tt.numProducers*tt.writesPerProducer*tt.md.DataPointCount()) @@ -133,25 +155,21 @@ func TestConsumeMetrics(t *testing.T) { } func generateSmallBatch() pmetric.Metrics { - metrics := pmetric.NewMetrics() - m := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - m.SetName("test_gauge") - dp := m.SetEmptyGauge().DataPoints().AppendEmpty() - dp.Attributes().PutStr("k0", "v0") - dp.Attributes().PutStr("k1", "v1") - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - dp.SetDoubleValue(123) - return metrics + return generateMetricsBatch(1) } func generateLargeBatch() pmetric.Metrics { + return generateMetricsBatch(1024) +} + +func generateMetricsBatch(size int) pmetric.Metrics { ts := time.Now() metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() rm.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test_carbon") ms := rm.ScopeMetrics().AppendEmpty().Metrics() - for i := 0; i < 1028; i++ { + for i := 0; i < size; i++ { m := ms.AppendEmpty() m.SetName("test_" + strconv.Itoa(i)) dp := m.SetEmptyGauge().DataPoints().AppendEmpty() @@ -165,19 +183,21 @@ func generateLargeBatch() pmetric.Metrics { } type carbonServer struct { - ln *net.TCPListener - doneServer *atomic.Bool - wg sync.WaitGroup + ln *net.TCPListener + doneServer *atomic.Bool + wg sync.WaitGroup + expectedContainsValue string } -func newCarbonServer(t *testing.T, addr string) *carbonServer { +func newCarbonServer(t *testing.T, addr string, expectedContainsValue string) *carbonServer { laddr, err := net.ResolveTCPAddr("tcp", addr) require.NoError(t, err) ln, err := net.ListenTCP("tcp", laddr) require.NoError(t, err) return &carbonServer{ - ln: ln, - doneServer: &atomic.Bool{}, + ln: ln, + doneServer: &atomic.Bool{}, + expectedContainsValue: expectedContainsValue, } } @@ -198,14 +218,16 @@ func (cs *carbonServer) start(t *testing.T, numExpectedReq int) { reader := bufio.NewReader(conn) for { - // Actual metric validation is done by other tests, here it - // is just flow. - _, err := reader.ReadBytes(byte('\n')) + buf, err := reader.ReadBytes(byte('\n')) if errors.Is(err, io.EOF) { return } require.NoError(t, err) + if cs.expectedContainsValue != "" { + assert.Contains(t, string(buf), cs.expectedContainsValue) + } + cs.wg.Done() } }(conn)