Skip to content

Commit

Permalink
[chore]: Expand e2e testbed (open-telemetry#27251)
Browse files Browse the repository at this point in the history
Related issue:
open-telemetry#20552

Tweak the mock-backend to do following:

- Receives data from the receiver.
- Returns errors randomly to our receiver, which attempts to resend/drop
the data.

This is helpful when we're required to test random behaviors of the
collector and ensure reliable data delivery.


This is my initial PR to expand the testbed. This will help my further
efforts to expand the testbed.
Myself and @omrozowicz-splunk plan on adding `sending_queue` support to
the testbed and expanding the testing capabilities.

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
2 people authored and JaredTan95 committed Oct 18, 2023
1 parent bb36f05 commit c38a394
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 25 deletions.
60 changes: 60 additions & 0 deletions testbed/correctnesstests/traces/correctness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,63 @@ func testWithTracingGoldenDataset(

tc.ValidateData()
}

func TestSporadicGoldenDataset(t *testing.T) {
testCases := []struct {
decisionFunc func() error
}{
{
decisionFunc: testbed.RandomNonPermanentError,
},
{
decisionFunc: testbed.RandomPermanentError,
},
}
for _, tt := range testCases {
factories, err := testbed.Components()
require.NoError(t, err, "default components resulted in: %v", err)
runner := testbed.NewInProcessCollector(factories)
options := testbed.LoadOptions{DataItemsPerSecond: 10000, ItemsPerBatch: 10}
dataProvider := testbed.NewGoldenDataProvider(
"../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_traces.txt",
"../../../internal/coreinternal/goldendataset/testdata/generated_pict_pairs_spans.txt",
"")
sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t))
receiver := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
receiver.WithRetry(`
retry_on_failure:
enabled: false
`)
receiver.WithQueue(`
sending_queue:
enabled: false
`)
_, err = runner.PrepareConfig(correctnesstests.CreateConfigYaml(sender, receiver, nil, "traces"))
require.NoError(t, err, "collector configuration resulted in: %v", err)
validator := testbed.NewCorrectTestValidator(sender.ProtocolName(), receiver.ProtocolName(), dataProvider)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
runner,
validator,
correctnessResults,
testbed.WithSkipResults(),
testbed.WithDecisionFunc(tt.decisionFunc),
)
defer tc.Stop()
tc.StartBackend()
tc.StartAgent()
tc.StartLoad(options)
tc.Sleep(3 * time.Second)

tc.StopLoad()

tc.WaitForN(func() bool {
return tc.LoadGenerator.DataItemsSent()-tc.LoadGenerator.PermanentErrors() == tc.MockBackend.DataItemsReceived()
}, 5*time.Second, "all data items received")
tc.StopAgent()
tc.ValidateData()
}
}
2 changes: 1 addition & 1 deletion testbed/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/text v0.13.0
google.golang.org/grpc v1.58.3
)

require (
Expand Down Expand Up @@ -259,7 +260,6 @@ require (
google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
93 changes: 73 additions & 20 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"go.opentelemetry.io/collector/consumer/consumererror"
"golang.org/x/text/message"
)

Expand All @@ -25,6 +26,10 @@ type LoadGenerator struct {
// Number of data items (spans or metric data points) sent.
dataItemsSent atomic.Uint64

// Number of permanent errors received
permanentErrors atomic.Uint64
nonPermanentErrors atomic.Uint64

stopOnce sync.Once
stopWait sync.WaitGroup
stopSignal chan struct{}
Expand Down Expand Up @@ -109,6 +114,14 @@ func (lg *LoadGenerator) DataItemsSent() uint64 {
return lg.dataItemsSent.Load()
}

func (lg *LoadGenerator) PermanentErrors() uint64 {
return lg.permanentErrors.Load()
}

func (lg *LoadGenerator) NonPermanentErrors() uint64 {
return lg.nonPermanentErrors.Load()
}

// IncDataItemsSent is used when a test bypasses the LoadGenerator and sends data
// directly via TestCases's Sender. This is necessary so that the total number of sent
// items in the end is correct, because the reports are printed from LoadGenerator's
Expand Down Expand Up @@ -184,12 +197,26 @@ func (lg *LoadGenerator) generateTrace() {
return
}

err := traceSender.ConsumeTraces(context.Background(), traceData)
if err == nil {
lg.prevErr = nil
} else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send traces: %v", err)
for {
err := traceSender.ConsumeTraces(context.Background(), traceData)
if err == nil {
lg.prevErr = nil
break
}

if !consumererror.IsPermanent(err) {
lg.nonPermanentErrors.Add(uint64(traceData.SpanCount()))
continue
}

lg.permanentErrors.Add(uint64(traceData.SpanCount()))

// update prevErr to err if it's different than last observed error
if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send traces: %v", err)
}
break
}
}

Expand All @@ -200,13 +227,26 @@ func (lg *LoadGenerator) generateMetrics() {
if done {
return
}

err := metricSender.ConsumeMetrics(context.Background(), metricData)
if err == nil {
lg.prevErr = nil
} else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send metrics: %v", err)
for {
err := metricSender.ConsumeMetrics(context.Background(), metricData)
if err == nil {
lg.prevErr = nil
break
}

if !consumererror.IsPermanent(err) {
lg.nonPermanentErrors.Add(uint64(metricData.DataPointCount()))
continue
}

lg.permanentErrors.Add(uint64(metricData.DataPointCount()))

// update prevErr to err if it's different than last observed error
if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send metrics: %v", err)
}
break
}
}

Expand All @@ -217,12 +257,25 @@ func (lg *LoadGenerator) generateLog() {
if done {
return
}

err := logSender.ConsumeLogs(context.Background(), logData)
if err == nil {
lg.prevErr = nil
} else if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send logs: %v", err)
for {
err := logSender.ConsumeLogs(context.Background(), logData)
if err == nil {
lg.prevErr = nil
break
}

if !consumererror.IsPermanent(err) {
lg.nonPermanentErrors.Add(uint64(logData.LogRecordCount()))
continue
}

lg.permanentErrors.Add(uint64(logData.LogRecordCount()))

// update prevErr to err if it's different than last observed error
if lg.prevErr == nil || lg.prevErr.Error() != err.Error() {
lg.prevErr = err
log.Printf("Cannot send logs: %v", err)
}
break
}
}
63 changes: 63 additions & 0 deletions testbed/testbed/mock_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,28 @@ package testbed // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"errors"
"log"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var errNonPermanent = errors.New("non permanent error")
var errPermanent = errors.New("permanent error")

type decisionFunc func() error

// MockBackend is a backend that allows receiving the data locally.
type MockBackend struct {
// Metric and trace consumers
Expand All @@ -41,6 +51,13 @@ type MockBackend struct {
ReceivedTraces []ptrace.Traces
ReceivedMetrics []pmetric.Metrics
ReceivedLogs []plog.Logs

DroppedTraces []ptrace.Traces
DroppedMetrics []pmetric.Metrics
DroppedLogs []plog.Logs

// decision to return permanent/non-permanent errors
decision decisionFunc
}

// NewMockBackend creates a new mock backend that receives data using specified receiver.
Expand All @@ -51,13 +68,18 @@ func NewMockBackend(logFilePath string, receiver DataReceiver) *MockBackend {
tc: &MockTraceConsumer{},
mc: &MockMetricConsumer{},
lc: &MockLogConsumer{},
decision: func() error { return nil },
}
mb.tc.backend = mb
mb.mc.backend = mb
mb.lc.backend = mb
return mb
}

func (mb *MockBackend) WithDecisionFunc(decision decisionFunc) {
mb.decision = decision
}

// Start a backend.
func (mb *MockBackend) Start() error {
log.Printf("Starting mock backend...")
Expand Down Expand Up @@ -161,6 +183,13 @@ func (tc *MockTraceConsumer) Capabilities() consumer.Capabilities {
}

func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td ptrace.Traces) error {
if err := tc.backend.decision(); err != nil {
if consumererror.IsPermanent(err) && tc.backend.isRecording {
tc.backend.DroppedTraces = append(tc.backend.DroppedTraces, td)
}
return err
}

tc.numSpansReceived.Add(uint64(td.SpanCount()))

rs := td.ResourceSpans()
Expand Down Expand Up @@ -208,6 +237,13 @@ func (mc *MockMetricConsumer) Capabilities() consumer.Capabilities {
}

func (mc *MockMetricConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error {
if err := mc.backend.decision(); err != nil {
if consumererror.IsPermanent(err) && mc.backend.isRecording {
mc.backend.DroppedMetrics = append(mc.backend.DroppedMetrics, md)
}
return err
}

mc.numMetricsReceived.Add(uint64(md.DataPointCount()))
mc.backend.ConsumeMetric(md)
return nil
Expand All @@ -233,8 +269,35 @@ func (lc *MockLogConsumer) Capabilities() consumer.Capabilities {
}

func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
if err := lc.backend.decision(); err != nil {
if consumererror.IsPermanent(err) && lc.backend.isRecording {
lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld)
}
return err
}

recordCount := ld.LogRecordCount()
lc.numLogRecordsReceived.Add(uint64(recordCount))
lc.backend.ConsumeLogs(ld)
return nil
}

// randomNonPermanentError is a decision function that succeeds approximately
// half of the time and fails with a non-permanent error the rest of the time.
func RandomNonPermanentError() error {
code := codes.Unavailable
s := status.New(code, errNonPermanent.Error())
if rand.Float32() < 0.5 {
return s.Err()
}
return nil
}

// randomPermanentError is a decision function that succeeds approximately
// half of the time and fails with a permanent error the rest of the time.
func RandomPermanentError() error {
if rand.Float32() < 0.5 {
return consumererror.NewPermanent(errPermanent)
}
return nil
}
7 changes: 7 additions & 0 deletions testbed/testbed/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ func WithResourceLimits(resourceSpec ResourceSpec) TestCaseOption {
}
}
}

// WithDecision enables our mock backend to behave sporadically
func WithDecisionFunc(decision decisionFunc) TestCaseOption {
return func(tc *TestCase) {
tc.decision = decision
}
}
17 changes: 15 additions & 2 deletions testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type BaseOTLPDataReceiver struct {
metricsReceiver receiver.Metrics
logReceiver receiver.Logs
compression string
retry string
sendingQueue string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -91,6 +93,16 @@ func (bor *BaseOTLPDataReceiver) WithCompression(compression string) *BaseOTLPDa
return bor
}

func (bor *BaseOTLPDataReceiver) WithRetry(retry string) *BaseOTLPDataReceiver {
bor.retry = retry
return bor
}

func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataReceiver {
bor.sendingQueue = sendingQueue
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
if err := bor.traceReceiver.Shutdown(context.Background()); err != nil {
return err
Expand All @@ -114,9 +126,10 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
str := fmt.Sprintf(`
%s:
endpoint: "%s"
%s
%s
tls:
insecure: true`, bor.exporterType, addr)

insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
Loading

0 comments on commit c38a394

Please sign in to comment.