Skip to content

Commit

Permalink
[chore] fix resourcetotelemetry usage in carbonexporter (open-telemet…
Browse files Browse the repository at this point in the history
…ry#29888)

In
open-telemetry#29879
I forgot to actually plugin the logic, only the config was added there.
Added a test to confirm it.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Dec 14, 2023
1 parent b4563f3 commit 4b47a0f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 22 deletions.
9 changes: 8 additions & 1 deletion exporter/carbonexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
)

// newCarbonExporter returns a new Carbon exporter.
Expand All @@ -20,7 +22,7 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
connPool: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
}

return exporterhelper.NewMetricsExporter(
exp, err := exporterhelper.NewMetricsExporter(
context.TODO(),
set,
cfg,
Expand All @@ -29,6 +31,11 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
exporterhelper.WithQueue(cfg.QueueConfig),
exporterhelper.WithRetry(cfg.RetryConfig),
exporterhelper.WithShutdown(sender.Shutdown))
if err != nil {
return nil, err
}

return resourcetotelemetry.WrapMetricsExporter(cfg.ResourceToTelemetryConfig, exp), nil
}

// carbonSender is the struct tying the translation function and the TCP
Expand Down
64 changes: 43 additions & 21 deletions exporter/carbonexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.9.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
)

func TestNewWithDefaultConfig(t *testing.T) {
Expand All @@ -45,10 +46,31 @@ func TestConsumeMetricsNoServer(t *testing.T) {
exportertest.NewNopCreateSettings())
require.NoError(t, err)
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, exp.ConsumeMetrics(context.Background(), generateLargeBatch()))
require.Error(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch()))
require.NoError(t, exp.Shutdown(context.Background()))
}

func TestConsumeMetricsWithResourceToTelemetry(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
cs := newCarbonServer(t, addr, "test_0;k0=v0;k1=v1;service.name=test_carbon 0")
// Each metric point will generate one Carbon line, set up the wait
// for all of them.
cs.start(t, 1)

exp, err := newCarbonExporter(
&Config{
TCPAddr: confignet.TCPAddr{Endpoint: addr},
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 5 * time.Second},
ResourceToTelemetryConfig: resourcetotelemetry.Settings{Enabled: true},
},
exportertest.NewNopCreateSettings())
require.NoError(t, err)
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, exp.ConsumeMetrics(context.Background(), generateSmallBatch()))
assert.NoError(t, exp.Shutdown(context.Background()))
cs.shutdownAndVerify(t)
}

func TestConsumeMetrics(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10147")
Expand Down Expand Up @@ -94,7 +116,7 @@ func TestConsumeMetrics(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
cs := newCarbonServer(t, addr)
cs := newCarbonServer(t, addr, "")
// Each metric point will generate one Carbon line, set up the wait
// for all of them.
cs.start(t, tt.numProducers*tt.writesPerProducer*tt.md.DataPointCount())
Expand Down Expand Up @@ -133,25 +155,21 @@ func TestConsumeMetrics(t *testing.T) {
}

func generateSmallBatch() pmetric.Metrics {
metrics := pmetric.NewMetrics()
m := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
m.SetName("test_gauge")
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.Attributes().PutStr("k0", "v0")
dp.Attributes().PutStr("k1", "v1")
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
dp.SetDoubleValue(123)
return metrics
return generateMetricsBatch(1)
}

func generateLargeBatch() pmetric.Metrics {
return generateMetricsBatch(1024)
}

func generateMetricsBatch(size int) pmetric.Metrics {
ts := time.Now()
metrics := pmetric.NewMetrics()
rm := metrics.ResourceMetrics().AppendEmpty()
rm.Resource().Attributes().PutStr(conventions.AttributeServiceName, "test_carbon")
ms := rm.ScopeMetrics().AppendEmpty().Metrics()

for i := 0; i < 1028; i++ {
for i := 0; i < size; i++ {
m := ms.AppendEmpty()
m.SetName("test_" + strconv.Itoa(i))
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
Expand All @@ -165,19 +183,21 @@ func generateLargeBatch() pmetric.Metrics {
}

type carbonServer struct {
ln *net.TCPListener
doneServer *atomic.Bool
wg sync.WaitGroup
ln *net.TCPListener
doneServer *atomic.Bool
wg sync.WaitGroup
expectedContainsValue string
}

func newCarbonServer(t *testing.T, addr string) *carbonServer {
func newCarbonServer(t *testing.T, addr string, expectedContainsValue string) *carbonServer {
laddr, err := net.ResolveTCPAddr("tcp", addr)
require.NoError(t, err)
ln, err := net.ListenTCP("tcp", laddr)
require.NoError(t, err)
return &carbonServer{
ln: ln,
doneServer: &atomic.Bool{},
ln: ln,
doneServer: &atomic.Bool{},
expectedContainsValue: expectedContainsValue,
}
}

Expand All @@ -198,14 +218,16 @@ func (cs *carbonServer) start(t *testing.T, numExpectedReq int) {

reader := bufio.NewReader(conn)
for {
// Actual metric validation is done by other tests, here it
// is just flow.
_, err := reader.ReadBytes(byte('\n'))
buf, err := reader.ReadBytes(byte('\n'))
if errors.Is(err, io.EOF) {
return
}
require.NoError(t, err)

if cs.expectedContainsValue != "" {
assert.Contains(t, string(buf), cs.expectedContainsValue)
}

cs.wg.Done()
}
}(conn)
Expand Down

0 comments on commit 4b47a0f

Please sign in to comment.