Skip to content

Commit

Permalink
[exporter/elasticsearch] Extend integration tests to cover traces (op…
Browse files Browse the repository at this point in the history
…en-telemetry#32512)

Enable integration tests for both traces and logs,
previously only logs were tested.
  • Loading branch information
lahsivjar committed May 1, 2024
1 parent 75af982 commit c0cfdfc
Showing 1 changed file with 49 additions and 27 deletions.
76 changes: 49 additions & 27 deletions exporter/elasticsearchexporter/integrationtest/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package integrationtest

import (
"errors"
"fmt"
"sync/atomic"
"testing"
"time"
Expand All @@ -15,41 +16,62 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

func TestLogExporter(t *testing.T) {
for _, tc := range []struct {
name string
// restartCollector restarts the OTEL collector. Restarting
// the collector allows durability testing of the ES exporter
// based on the OTEL config used for testing.
restartCollector bool
mockESFailure bool
}{
{name: "basic"},
{name: "es_intermittent_failure", mockESFailure: true},
/* Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed
{name: "collector_restarts", restartCollector: true},
{name: "collector_restart_with_es_intermittent_failure", mockESFailure: true, restartCollector: true},
*/
} {
t.Run(tc.name, func(t *testing.T) {
runner(t, tc.restartCollector, tc.mockESFailure)
})
func TestExporter(t *testing.T) {
// NOTE: The data receiver/mock backend will receive and process traces
// as log document. This does not reduce the effectiveness of testing as
// the assertions can still be made without considering for data types
// in the mock backend. Adding support for traces in the mock data
// receiver is possible, however, distinguishing between traces and
// logs is not straightforward after the document has been encoded and
// doesn't add any practical benefits to the test.
for _, eventType := range []string{"logs", "traces"} {
for _, tc := range []struct {
name string
// restartCollector restarts the OTEL collector. Restarting
// the collector allows durability testing of the ES exporter
// based on the OTEL config used for testing.
restartCollector bool
mockESFailure bool
}{
{name: "basic"},
{name: "es_intermittent_failure", mockESFailure: true},
/* TODO: Below tests should be enabled after https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30792 is fixed
{name: "collector_restarts", restartCollector: true},
{name: "collector_restart_with_es_intermittent_failure", mockESFailure: true, restartCollector: true},
*/
} {
t.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(t *testing.T) {
runner(t, eventType, tc.restartCollector, tc.mockESFailure)
})
}
}
}

func runner(t *testing.T, restartCollector, mockESFailure bool) {
func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool) {
t.Helper()

sender := testbed.NewOTLPLogsDataSender(
testbed.DefaultHost, testutil.GetAvailablePort(t),
var (
sender testbed.DataSender
host = testbed.DefaultHost
port = testutil.GetAvailablePort(t)
)
switch eventType {
case "logs":
sender = testbed.NewOTLPLogsDataSender(host, port)
case "traces":
sender = testbed.NewOTLPTraceDataSender(host, port)
default:
t.Fatalf("failed to create data sender for type: %s", eventType)
}

receiver := newElasticsearchDataReceiver(t)
provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{
DataItemsPerSecond: 10_000,
loadOpts := testbed.LoadOptions{
DataItemsPerSecond: 1_000,
ItemsPerBatch: 10,
})
}
provider := testbed.NewPerfTestDataProvider(loadOpts)

cfg := createConfigYaml(t, sender, receiver, nil, nil, "logs", getDebugFlag(t))
cfg := createConfigYaml(t, sender, receiver, nil, nil, eventType, getDebugFlag(t))
t.Log("test otel collector configuration:", cfg)
collector := newRecreatableOtelCol(t)
cleanup, err := collector.PrepareConfig(cfg)
Expand Down Expand Up @@ -79,7 +101,7 @@ func runner(t *testing.T, restartCollector, mockESFailure bool) {
tc.StartAgent()

// Start sending load and send for some time before proceeding.
tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 1_000})
tc.StartLoad(loadOpts)
tc.Sleep(2 * time.Second)

// Fail ES if required and send load.
Expand Down

0 comments on commit c0cfdfc

Please sign in to comment.