Skip to content

Commit

Permalink
[chore] testbed: sending queue otlp (open-telemetry#28904)
Browse files Browse the repository at this point in the history
**Description:** This PR adds two tests with an otlp receiver and
exporter with sending queues. We have two scenarios here:

Sending queue full
1. We generate permanent errors until `sending_queue is full` log
appears in the agent's logs
2. Then we get IDs of logs meant to be retried and IDs of logs received
successfully and check if all of them were retried

The current testbed is unable to get the information about the errors
from load generator's perspective, so I needed to put `LogsToRetry` in
`mock_backend` to be able to track what logs suffered from permanent
error.

Sending queue not full
Sanity test to check a default behavior of sending queue, but without
making it full.

So far only logs sending queues are covered, not sure if we should add
it for every data type. Currently, one test takes about ~9s.

**Link to tracking Issue:** A related issue is this one:
open-telemetry#20552,
as these tests cover `Exporter helper QueuedRetry queue limit size is
hit.` scenario

---------

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
omrozowicz-splunk and mx-psi committed Jan 4, 2024
1 parent d95d8c6 commit 2fe33d3
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 4 deletions.
24 changes: 20 additions & 4 deletions testbed/testbed/mock_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type MockBackend struct {
DroppedMetrics []pmetric.Metrics
DroppedLogs []plog.Logs

LogsToRetry []plog.Logs

// decision to return permanent/non-permanent errors
decision decisionFunc
}
Expand Down Expand Up @@ -166,8 +168,6 @@ func (mb *MockBackend) ConsumeMetric(md pmetric.Metrics) {
var _ consumer.Traces = (*MockTraceConsumer)(nil)

func (mb *MockBackend) ConsumeLogs(ld plog.Logs) {
mb.recordMutex.Lock()
defer mb.recordMutex.Unlock()
if mb.isRecording {
mb.ReceivedLogs = append(mb.ReceivedLogs, ld)
}
Expand Down Expand Up @@ -269,9 +269,15 @@ func (lc *MockLogConsumer) Capabilities() consumer.Capabilities {
}

func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
lc.backend.recordMutex.Lock()
defer lc.backend.recordMutex.Unlock()
if err := lc.backend.decision(); err != nil {
if consumererror.IsPermanent(err) && lc.backend.isRecording {
lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld)
if lc.backend.isRecording {
if consumererror.IsPermanent(err) {
lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld)
} else {
lc.backend.LogsToRetry = append(lc.backend.LogsToRetry, ld)
}
}
return err
}
Expand All @@ -293,6 +299,16 @@ func RandomNonPermanentError() error {
return nil
}

func GenerateNonPernamentErrorUntil(ch chan bool) error {
code := codes.Unavailable
s := status.New(code, errNonPermanent.Error())
defaultReturn := s.Err()
if <-ch {
return defaultReturn
}
return nil
}

// randomPermanentError is a decision function that succeeds approximately
// half of the time and fails with a permanent error the rest of the time.
func RandomPermanentError() error {
Expand Down
6 changes: 6 additions & 0 deletions testbed/testbed/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ type benchmarkResult struct {
Extra string `json:"extra,omitempty"`
}

type LogPresentResults struct {
testName string
result string
duration time.Duration
}

// PerformanceResults implements the TestResultsSummary interface with fields suitable for reporting
// performance test results.
type PerformanceResults struct {
Expand Down
34 changes: 34 additions & 0 deletions testbed/testbed/test_case.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package testbed // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"path"
"path/filepath"
"testing"
Expand Down Expand Up @@ -322,3 +324,35 @@ func (tc *TestCase) logStatsOnce() {
tc.LoadGenerator.GetStats(),
tc.MockBackend.GetStats())
}

// Used to search for text in agent.log
// It can be used to verify if we've hit QueuedRetry sender or memory limiter
func (tc *TestCase) AgentLogsContains(text string) bool {
filename := tc.ComposeTestResultFileName("agent.log")
cmd := exec.Command("cat", filename)
grep := exec.Command("grep", "-E", text)

pipe, err := cmd.StdoutPipe()
defer func(pipe io.ReadCloser) {
err = pipe.Close()
if err != nil {
panic(err)
}
}(pipe)
grep.Stdin = pipe

if err != nil {
log.Printf("Error while searching %s in %s", text, tc.ComposeTestResultFileName("agent.log"))
return false
}

err = cmd.Start()
if err != nil {
log.Print("Error while executing command: ", err.Error())
return false
}

res, _ := grep.Output()
return string(res) != ""

}
40 changes: 40 additions & 0 deletions testbed/testbed/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,46 @@ type TestCaseValidator interface {
RecordResults(tc *TestCase)
}

type LogPresentValidator struct {
LogBody string
Present bool
}

func (v *LogPresentValidator) Validate(tc *TestCase) {
logMsg := v.LogBody
var successMsg, errorMsg string
if v.Present {
successMsg = fmt.Sprintf("Log '%s' found", logMsg)
errorMsg = fmt.Sprintf("Log '%s' not found", logMsg)
} else {
errorMsg = fmt.Sprintf("Log '%s' found", logMsg)
successMsg = fmt.Sprintf("Log '%s' not found", logMsg)
}

if assert.True(tc.t, tc.AgentLogsContains(logMsg) == v.Present, errorMsg) {
log.Print(successMsg)
}
}

func (v *LogPresentValidator) RecordResults(tc *TestCase) {

var result string
if tc.t.Failed() {
result = "FAIL"
} else {
result = "PASS"
}

// Remove "Test" prefix from test name.
testName := tc.t.Name()[4:]

tc.resultsSummary.Add(tc.t.Name(), &LogPresentResults{
testName: testName,
result: result,
duration: time.Since(tc.startTime),
})
}

// PerfTestValidator implements TestCaseValidator for test suites using PerformanceResults for summarizing results.
type PerfTestValidator struct{}

Expand Down
61 changes: 61 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,64 @@ func TestLog10kDPS(t *testing.T) {
})
}
}

func TestLogOtlpSendingQueue(t *testing.T) {
otlpreceiver10 := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
otlpreceiver10.WithRetry(`
retry_on_failure:
enabled: true
`)
otlpreceiver10.WithQueue(`
sending_queue:
enabled: true
queue_size: 10
`)
t.Run("OTLP-sending-queue-full", func(t *testing.T) {
ScenarioSendingQueuesFull(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
otlpreceiver10,
testbed.LoadOptions{
DataItemsPerSecond: 100,
ItemsPerBatch: 10,
Parallel: 1,
},
testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 120,
}, 10,
performanceResultsSummary,
nil,
nil)
})

otlpreceiver100 := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t))
otlpreceiver100.WithRetry(`
retry_on_failure:
enabled: true
`)
otlpreceiver10.WithQueue(`
sending_queue:
enabled: true
queue_size: 100
`)
t.Run("OTLP-sending-queue-not-full", func(t *testing.T) {
ScenarioSendingQueuesNotFull(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
otlpreceiver100,
testbed.LoadOptions{
DataItemsPerSecond: 100,
ItemsPerBatch: 10,
Parallel: 1,
},
testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 120,
}, 10,
performanceResultsSummary,
nil,
nil)
})

}
Loading

0 comments on commit 2fe33d3

Please sign in to comment.