From 2fe33d3d3022f4c60ba116a02fdd18a20adc1f7e Mon Sep 17 00:00:00 2001 From: Olga <86965961+omrozowicz-splunk@users.noreply.github.com> Date: Thu, 4 Jan 2024 16:16:58 +0100 Subject: [PATCH] [chore] testbed: sending queue otlp (#28904) **Description:** This PR adds two tests with an otlp receiver and exporter with sending queues. We have two scenarios here: Sending queue full 1. We generate permanent errors until `sending_queue is full` log appears in the agent's logs 2. Then we get IDs of logs meant to be retried and IDs of logs received successfully and check if all of them were retried The current testbed is unable to get the information about the errors from load generator's perspective, so I needed to put `LogsToRetry` in `mock_backend` to be able to track what logs suffered from permanent error. Sending queue not full Sanity test to check a default behavior of sending queue, but without making it full. So far only logs sending queues are covered, not sure if we should add it for every data type. Currently, one test takes about ~9s. **Link to tracking Issue:** A related issue is this one: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/20552, as these tests cover `Exporter helper QueuedRetry queue limit size is hit.` scenario --------- Co-authored-by: Pablo Baeyens --- testbed/testbed/mock_backend.go | 24 ++++- testbed/testbed/results.go | 6 ++ testbed/testbed/test_case.go | 34 +++++++ testbed/testbed/validator.go | 40 ++++++++ testbed/tests/log_test.go | 61 ++++++++++++ testbed/tests/scenarios.go | 170 ++++++++++++++++++++++++++++++++ 6 files changed, 331 insertions(+), 4 deletions(-) diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index e6e932985633..141d2033e193 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -56,6 +56,8 @@ type MockBackend struct { DroppedMetrics []pmetric.Metrics DroppedLogs []plog.Logs + LogsToRetry []plog.Logs + // decision to return permanent/non-permanent errors decision decisionFunc } @@ -166,8 +168,6 @@ func (mb *MockBackend) ConsumeMetric(md pmetric.Metrics) { var _ consumer.Traces = (*MockTraceConsumer)(nil) func (mb *MockBackend) ConsumeLogs(ld plog.Logs) { - mb.recordMutex.Lock() - defer mb.recordMutex.Unlock() if mb.isRecording { mb.ReceivedLogs = append(mb.ReceivedLogs, ld) } @@ -269,9 +269,15 @@ func (lc *MockLogConsumer) Capabilities() consumer.Capabilities { } func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld plog.Logs) error { + lc.backend.recordMutex.Lock() + defer lc.backend.recordMutex.Unlock() if err := lc.backend.decision(); err != nil { - if consumererror.IsPermanent(err) && lc.backend.isRecording { - lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld) + if lc.backend.isRecording { + if consumererror.IsPermanent(err) { + lc.backend.DroppedLogs = append(lc.backend.DroppedLogs, ld) + } else { + lc.backend.LogsToRetry = append(lc.backend.LogsToRetry, ld) + } } return err } @@ -293,6 +299,16 @@ func RandomNonPermanentError() error { return nil } +func GenerateNonPernamentErrorUntil(ch chan bool) error { + code := codes.Unavailable + s := status.New(code, errNonPermanent.Error()) + defaultReturn := s.Err() + if <-ch { + return defaultReturn + } + return nil +} + // randomPermanentError is a decision function that succeeds approximately // half of the time and fails with a permanent error the rest of the time. func RandomPermanentError() error { diff --git a/testbed/testbed/results.go b/testbed/testbed/results.go index 0e7b03599a50..e25a822f2895 100644 --- a/testbed/testbed/results.go +++ b/testbed/testbed/results.go @@ -34,6 +34,12 @@ type benchmarkResult struct { Extra string `json:"extra,omitempty"` } +type LogPresentResults struct { + testName string + result string + duration time.Duration +} + // PerformanceResults implements the TestResultsSummary interface with fields suitable for reporting // performance test results. type PerformanceResults struct { diff --git a/testbed/testbed/test_case.go b/testbed/testbed/test_case.go index 2bed52bb39be..a87a1de963d3 100644 --- a/testbed/testbed/test_case.go +++ b/testbed/testbed/test_case.go @@ -5,9 +5,11 @@ package testbed // import "github.com/open-telemetry/opentelemetry-collector-con import ( "fmt" + "io" "log" "net" "os" + "os/exec" "path" "path/filepath" "testing" @@ -322,3 +324,35 @@ func (tc *TestCase) logStatsOnce() { tc.LoadGenerator.GetStats(), tc.MockBackend.GetStats()) } + +// Used to search for text in agent.log +// It can be used to verify if we've hit QueuedRetry sender or memory limiter +func (tc *TestCase) AgentLogsContains(text string) bool { + filename := tc.ComposeTestResultFileName("agent.log") + cmd := exec.Command("cat", filename) + grep := exec.Command("grep", "-E", text) + + pipe, err := cmd.StdoutPipe() + defer func(pipe io.ReadCloser) { + err = pipe.Close() + if err != nil { + panic(err) + } + }(pipe) + grep.Stdin = pipe + + if err != nil { + log.Printf("Error while searching %s in %s", text, tc.ComposeTestResultFileName("agent.log")) + return false + } + + err = cmd.Start() + if err != nil { + log.Print("Error while executing command: ", err.Error()) + return false + } + + res, _ := grep.Output() + return string(res) != "" + +} diff --git a/testbed/testbed/validator.go b/testbed/testbed/validator.go index b22d10200e2d..d7cc2fcfb48c 100644 --- a/testbed/testbed/validator.go +++ b/testbed/testbed/validator.go @@ -24,6 +24,46 @@ type TestCaseValidator interface { RecordResults(tc *TestCase) } +type LogPresentValidator struct { + LogBody string + Present bool +} + +func (v *LogPresentValidator) Validate(tc *TestCase) { + logMsg := v.LogBody + var successMsg, errorMsg string + if v.Present { + successMsg = fmt.Sprintf("Log '%s' found", logMsg) + errorMsg = fmt.Sprintf("Log '%s' not found", logMsg) + } else { + errorMsg = fmt.Sprintf("Log '%s' found", logMsg) + successMsg = fmt.Sprintf("Log '%s' not found", logMsg) + } + + if assert.True(tc.t, tc.AgentLogsContains(logMsg) == v.Present, errorMsg) { + log.Print(successMsg) + } +} + +func (v *LogPresentValidator) RecordResults(tc *TestCase) { + + var result string + if tc.t.Failed() { + result = "FAIL" + } else { + result = "PASS" + } + + // Remove "Test" prefix from test name. + testName := tc.t.Name()[4:] + + tc.resultsSummary.Add(tc.t.Name(), &LogPresentResults{ + testName: testName, + result: result, + duration: time.Since(tc.startTime), + }) +} + // PerfTestValidator implements TestCaseValidator for test suites using PerformanceResults for summarizing results. type PerfTestValidator struct{} diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 2857c50e3e99..aa22c123095b 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -162,3 +162,64 @@ func TestLog10kDPS(t *testing.T) { }) } } + +func TestLogOtlpSendingQueue(t *testing.T) { + otlpreceiver10 := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) + otlpreceiver10.WithRetry(` + retry_on_failure: + enabled: true +`) + otlpreceiver10.WithQueue(` + sending_queue: + enabled: true + queue_size: 10 +`) + t.Run("OTLP-sending-queue-full", func(t *testing.T) { + ScenarioSendingQueuesFull( + t, + testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + otlpreceiver10, + testbed.LoadOptions{ + DataItemsPerSecond: 100, + ItemsPerBatch: 10, + Parallel: 1, + }, + testbed.ResourceSpec{ + ExpectedMaxCPU: 80, + ExpectedMaxRAM: 120, + }, 10, + performanceResultsSummary, + nil, + nil) + }) + + otlpreceiver100 := testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)) + otlpreceiver100.WithRetry(` + retry_on_failure: + enabled: true +`) + otlpreceiver10.WithQueue(` + sending_queue: + enabled: true + queue_size: 100 +`) + t.Run("OTLP-sending-queue-not-full", func(t *testing.T) { + ScenarioSendingQueuesNotFull( + t, + testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)), + otlpreceiver100, + testbed.LoadOptions{ + DataItemsPerSecond: 100, + ItemsPerBatch: 10, + Parallel: 1, + }, + testbed.ResourceSpec{ + ExpectedMaxCPU: 80, + ExpectedMaxRAM: 120, + }, 10, + performanceResultsSummary, + nil, + nil) + }) + +} diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index e61f1032064f..6126cda9640c 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -12,9 +12,11 @@ import ( "path" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) @@ -369,6 +371,141 @@ func ScenarioTestTraceNoBackend10kSPS( assert.Less(t, configuration.ExpectedMinFinalRAM, rss) } +func ScenarioSendingQueuesFull( + t *testing.T, + sender testbed.DataSender, + receiver testbed.DataReceiver, + loadOptions testbed.LoadOptions, + resourceSpec testbed.ResourceSpec, + sleepTime int, + resultsSummary testbed.TestResultsSummary, + processors map[string]string, + extensions map[string]string, +) { + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + + agentProc := testbed.NewChildProcessCollector() + + configStr := createConfigYaml(t, sender, receiver, resultDir, processors, extensions) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + dataProvider := testbed.NewPerfTestDataProvider(loadOptions) + dataChannel := make(chan bool) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.LogPresentValidator{ + LogBody: "sending_queue is full", + Present: true, + }, + resultsSummary, + testbed.WithResourceLimits(resourceSpec), + testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }), + ) + + tc.MockBackend.EnableRecording() + defer tc.Stop() + + tc.StartBackend() + tc.StartAgent() + + tc.StartLoad(loadOptions) + + tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, time.Second*time.Duration(sleepTime), "load generator started") + + // searchFunc checks for "sending_queue is full" communicate and sends the signal to GenerateNonPernamentErrorUntil + // to generate only successes from that time on + tc.WaitForN(func() bool { + logFound := tc.AgentLogsContains("sending_queue is full") + if !logFound { + dataChannel <- true + return false + } + tc.WaitFor(func() bool { return tc.MockBackend.DataItemsReceived() == 0 }, "no data successfully received before an error") + close(dataChannel) + return logFound + }, time.Second*time.Duration(sleepTime), "sending_queue errors present") + + // check if data started to be received successfully + tc.WaitForN(func() bool { + return tc.MockBackend.DataItemsReceived() > 0 + }, time.Second*time.Duration(sleepTime), "data started to be successfully received") + + tc.WaitForN(func() bool { + // get IDs from logs to retry + logsToRetry := getLogsID(tc.MockBackend.LogsToRetry) + + // get IDs from logs received successfully + successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs) + + // check if all the logs to retry were actually retried + logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs) + return logsWereRetried + }, time.Second*time.Duration(sleepTime), "all logs were retried successfully") + + tc.StopLoad() + tc.StopAgent() + tc.ValidateData() +} + +func ScenarioSendingQueuesNotFull( + t *testing.T, + sender testbed.DataSender, + receiver testbed.DataReceiver, + loadOptions testbed.LoadOptions, + resourceSpec testbed.ResourceSpec, + sleepTime int, + resultsSummary testbed.TestResultsSummary, + processors map[string]string, + extensions map[string]string, +) { + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + + agentProc := testbed.NewChildProcessCollector() + + configStr := createConfigYaml(t, sender, receiver, resultDir, processors, extensions) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + dataProvider := testbed.NewPerfTestDataProvider(loadOptions) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.LogPresentValidator{ + LogBody: "sending_queue is full", + Present: false, + }, + resultsSummary, + testbed.WithResourceLimits(resourceSpec), + ) + defer tc.Stop() + + tc.StartBackend() + tc.StartAgent() + + tc.StartLoad(loadOptions) + + tc.Sleep(time.Second * time.Duration(sleepTime)) + + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") + + tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), + "all spans received") + + tc.StopLoad() + tc.StopAgent() + tc.ValidateData() +} + func constructLoadOptions(test TestCase) testbed.LoadOptions { options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10} options.Attributes = make(map[string]string) @@ -380,3 +517,36 @@ func constructLoadOptions(test TestCase) testbed.LoadOptions { } return options } + +func getLogsID(logToRetry []plog.Logs) []string { + var result []string + for _, logElement := range logToRetry { + logRecord := logElement.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() + for index := 0; index < logRecord.Len(); index++ { + logObj := logRecord.At(index) + itemIndex, _ := logObj.Attributes().Get("item_index") + batchIndex, _ := logObj.Attributes().Get("batch_index") + result = append(result, fmt.Sprintf("%s%s", batchIndex.AsString(), itemIndex.AsString())) + } + } + return result +} + +func allElementsExistInSlice(slice1, slice2 []string) bool { + // Create a map to store elements of slice2 for efficient lookup + elementMap := make(map[string]bool) + + // Populate the map with elements from slice2 + for _, element := range slice2 { + elementMap[element] = true + } + + // Check if all elements of slice1 exist in slice2 + for _, element := range slice1 { + if _, exists := elementMap[element]; !exists { + return false + } + } + + return true +}