Skip to content

Commit

Permalink
[chore][pkg/stanza] Replace rotation test (open-telemetry#31298)
Browse files Browse the repository at this point in the history
Resolves
open-telemetry#30836

Also removes a dependency on `observIQ/nanojack` by simplifying rotation
tests. We still test both move/create and copy/truncate strategies, but
the unit tests are managing rotated files manually. We are no longer
testing rotation based on different naming strategies since this is
effectively just a matching problem, which is well tested elsewhere.
  • Loading branch information
djaglowski committed Feb 21, 2024
1 parent be5986f commit fb87e84
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 302 deletions.
2 changes: 0 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions cmd/oteltestbedcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions exporter/datadogexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/internal/emittest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
case call := <-s.emitChan:
actual = append(actual, call.Token)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
assert.Fail(t, fmt.Sprintf("timeout: expected: %d, actual: %d", len(expected), i))
return
}
}
Expand All @@ -135,7 +135,7 @@ func (s *Sink) ExpectCalls(t *testing.T, expected ...*Call) {
case call := <-s.emitChan:
actual = append(actual, call)
case <-time.After(s.timeout):
assert.Fail(t, "Timed out waiting for message")
assert.Fail(t, fmt.Sprintf("timeout: expected: %d, actual: %d", len(expected), i))
return
}
}
Expand Down
260 changes: 19 additions & 241 deletions pkg/stanza/fileconsumer/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,101 +7,37 @@ import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"runtime"
"sync"
"testing"
"time"

"github.com/observiq/nanojack"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

const windowsOS = "windows"

func TestMultiFileRotate(t *testing.T) {
func TestCopyTruncate(t *testing.T) {
if runtime.GOOS == windowsOS {
// Windows has very poor support for moving active files, so rotation is less commonly used
t.Skip()
}
t.Parallel()

getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) }

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)

numFiles := 3
numMessages := 3
numRotations := 3

expected := make([][]byte, 0, numFiles*numMessages*numRotations)
for i := 0; i < numFiles; i++ {
for j := 0; j < numMessages; j++ {
for k := 0; k < numRotations; k++ {
expected = append(expected, []byte(getMessage(i, k, j)))
}
}
}

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

temps := make([]*os.File, 0, numFiles)
for i := 0; i < numFiles; i++ {
temps = append(temps, filetest.OpenTemp(t, tempDir))
}

var wg sync.WaitGroup
for i, temp := range temps {
wg.Add(1)
go func(tf *os.File, f int) {
defer wg.Done()
for k := 0; k < numRotations; k++ {
for j := 0; j < numMessages; j++ {
filetest.WriteString(t, tf, getMessage(f, k, j)+"\n")
}

require.NoError(t, tf.Close())
require.NoError(t, os.Rename(tf.Name(), fmt.Sprintf("%s.%d", tf.Name(), k)))
tf = filetest.ReopenTemp(t, tf.Name())
}
}(temp, i)
}

sink.ExpectTokens(t, expected...)
wg.Wait()
}

func TestMultiFileRotateSlow(t *testing.T) {
if runtime.GOOS == windowsOS {
// Windows has very poor support for moving active files, so rotation is less commonly used
t.Skip()
t.Skip("Rotation tests have been flaky on Windows. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16331")
}

t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.PollInterval = 10 * time.Millisecond
operator, sink := testManager(t, cfg)

getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) }
fileName := func(f, k int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.rot%d.log", f, k)) }
baseFileName := func(f int) string { return filepath.Join(tempDir, fmt.Sprintf("file%d.log", f)) }

numFiles := 3
numMessages := 30
numMessages := 300
numRotations := 3

expected := make([][]byte, 0, numFiles*numMessages*numRotations)
Expand All @@ -124,15 +60,22 @@ func TestMultiFileRotateSlow(t *testing.T) {
go func(fn int) {
defer wg.Done()

file := filetest.OpenFile(t, baseFileName(fn))
for rotationNum := 0; rotationNum < numRotations; rotationNum++ {
file := filetest.OpenFile(t, baseFileName(fn))
for messageNum := 0; messageNum < numMessages; messageNum++ {
filetest.WriteString(t, file, getMessage(fn, rotationNum, messageNum)+"\n")
time.Sleep(5 * time.Millisecond)
}

require.NoError(t, file.Close())
require.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum)))
require.NoError(t, file.Sync())
_, err := file.Seek(0, 0)
require.NoError(t, err)
dst := filetest.OpenFile(t, fileName(fn, rotationNum))
_, err = io.Copy(dst, file)
require.NoError(t, err)
require.NoError(t, dst.Close())
require.NoError(t, file.Truncate(0))
_, err = file.Seek(0, 0)
require.NoError(t, err)
}
}(fileNum)
}
Expand All @@ -141,7 +84,7 @@ func TestMultiFileRotateSlow(t *testing.T) {
wg.Wait()
}

func TestMultiCopyTruncateSlow(t *testing.T) {
func TestMoveCreate(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Rotation tests have been flaky on Windows. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16331")
}
Expand Down Expand Up @@ -182,19 +125,10 @@ func TestMultiCopyTruncateSlow(t *testing.T) {
file := filetest.OpenFile(t, baseFileName(fn))
for messageNum := 0; messageNum < numMessages; messageNum++ {
filetest.WriteString(t, file, getMessage(fn, rotationNum, messageNum)+"\n")
time.Sleep(5 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}

_, err := file.Seek(0, 0)
require.NoError(t, err)
dst := filetest.OpenFile(t, fileName(fn, rotationNum))
_, err = io.Copy(dst, file)
require.NoError(t, err)
require.NoError(t, dst.Close())
require.NoError(t, file.Truncate(0))
_, err = file.Seek(0, 0)
require.NoError(t, err)
file.Close()
require.NoError(t, file.Close())
require.NoError(t, os.Rename(baseFileName(fn), fileName(fn, rotationNum)))
}
}(fileNum)
}
Expand All @@ -203,162 +137,6 @@ func TestMultiCopyTruncateSlow(t *testing.T) {
wg.Wait()
}

type rotationTest struct {
name string
totalLines int
maxLinesPerFile int
maxBackupFiles int
writeInterval time.Duration
pollInterval time.Duration
ephemeralLines bool
}

/*
When log files are rotated at extreme speeds, it is possible to miss some log entries.
This can happen when an individual log entry is written and deleted within the duration
of a single poll interval. For example, consider the following scenario:
- A log file may have up to 9 backups (10 total log files)
- Each log file may contain up to 10 entries
- Log entries are written at an interval of 10µs
- Log files are polled at an interval of 100ms
In this scenario, a log entry that is written may only exist on disk for about 1ms.
A polling interval of 100ms will most likely never produce a chance to read the log file.
In production settings, this consideration is not very likely to be a problem, but it is
easy to encounter the issue in tests, and difficult to deterministically simulate edge cases.
However, the above understanding does allow for some consistent expectations.
1. Cases that do not require deletion of old log entries should always pass.
2. Cases where the polling interval is sufficiently rapid should always pass.
3. When neither 1 nor 2 is true, there may be missing entries, but still no duplicates.
The following method is provided largely as documentation of how this is expected to behave.
In practice, timing is largely dependent on the responsiveness of system calls.
*/
func (rt rotationTest) expectEphemeralLines() bool {
// primary + backups
maxLinesInAllFiles := rt.maxLinesPerFile + rt.maxLinesPerFile*rt.maxBackupFiles

// Will the test write enough lines to result in deletion of oldest backups?
maxBackupsExceeded := rt.totalLines > maxLinesInAllFiles

// last line written in primary file will exist for l*b more writes
minTimeToLive := time.Duration(int(rt.writeInterval) * rt.maxLinesPerFile * rt.maxBackupFiles)

// can a line be written and then rotated to deletion before ever observed?
return maxBackupsExceeded && rt.pollInterval > minTimeToLive
}

func (rt rotationTest) run(tc rotationTest, copyTruncate, sequential bool) func(t *testing.T) {
return func(t *testing.T) {

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.PollInterval = tc.pollInterval
sink := emittest.NewSink(emittest.WithCallBuffer(tc.totalLines))
operator := testManagerWithSink(t, cfg, sink)

file, err := os.CreateTemp(tempDir, "")
require.NoError(t, err)
require.NoError(t, file.Close()) // will be managed by rotator

rotator := nanojack.Logger{
Filename: file.Name(),
MaxLines: tc.maxLinesPerFile,
MaxBackups: tc.maxBackupFiles,
CopyTruncate: copyTruncate,
Sequential: sequential,
}
t.Cleanup(func() { _ = rotator.Close() })

logger := log.New(&rotator, "", 0)

expected := make([][]byte, 0, tc.totalLines)
baseStr := string(filetest.TokenWithLength(46)) // + ' 123'
for i := 0; i < tc.totalLines; i++ {
expected = append(expected, []byte(fmt.Sprintf("%s %3d", baseStr, i)))
}

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

for _, message := range expected {
logger.Println(string(message))
time.Sleep(tc.writeInterval)
}

received := make([][]byte, 0, tc.totalLines)
for i := 0; i < tc.totalLines; i++ {
received = append(received, sink.NextToken(t))
}

if tc.ephemeralLines {
if !tc.expectEphemeralLines() {
// This is helpful for test development, and ensures the sample computation is used
t.Logf("Potentially unstable ephemerality expectation for test: %s", tc.name)
}
require.Subset(t, expected, received)
} else {
require.ElementsMatch(t, expected, received)
}
}
}

func TestRotation(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Rotation tests have been flaky on Windows. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16331")
}
cases := []rotationTest{
{
name: "NoRotation",
totalLines: 10,
maxLinesPerFile: 10,
maxBackupFiles: 1,
writeInterval: time.Millisecond,
pollInterval: 10 * time.Millisecond,
},
{
name: "NoDeletion",
totalLines: 20,
maxLinesPerFile: 10,
maxBackupFiles: 1,
writeInterval: time.Millisecond,
pollInterval: 10 * time.Millisecond,
},
{
name: "Deletion",
totalLines: 30,
maxLinesPerFile: 10,
maxBackupFiles: 1,
writeInterval: time.Millisecond,
pollInterval: 10 * time.Millisecond,
ephemeralLines: true,
},
{
name: "Deletion/ExceedFingerprint",
totalLines: 300,
maxLinesPerFile: 100,
maxBackupFiles: 1,
writeInterval: time.Millisecond,
pollInterval: 10 * time.Millisecond,
ephemeralLines: true,
},
}

for _, tc := range cases {
if runtime.GOOS != windowsOS {
// Windows has very poor support for moving active files, so rotation is less commonly used
t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false))
t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true))
}
t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false))
t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true))
}
}

func TestMoveFile(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Moving files while open is unsupported on Windows")
Expand Down
1 change: 0 additions & 1 deletion pkg/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
github.com/haimrubinstein/go-syslog/v3 v3.0.0
github.com/jpillora/backoff v1.0.0
github.com/json-iterator/go v1.1.12
github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.95.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.95.0
github.com/stretchr/testify v1.8.4
Expand Down
Loading

0 comments on commit fb87e84

Please sign in to comment.