Skip to content

Commit

Permalink
[pkg/stanza] Fix flaky TestDeleteAfterRead_SkipPartials (open-telemet…
Browse files Browse the repository at this point in the history
…ry#29597)

Fixes open-telemetry#27845 

The failure mechanism was very subtle. The test waits for the contents
of the short file to be emitted before canceling the context which is
used in the file reader. However, the reader may require one additional
scan to determine that it's reached the end of the file, and therefore
in a tiny proportion of cases, the context cancelation could actually
trigger the reader to return before it had determined it should delete
the file. The solution involves validating file deletion before
canceling the context. I was able to reproduce failure locally only
about 1/3000 times, but with this change was able to run 30,000 times
without a failure. The PR also cleans up a few related parts of the
code.
  • Loading branch information
djaglowski committed Dec 1, 2023
1 parent f369f60 commit 03ad502
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
26 changes: 13 additions & 13 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,10 +1494,8 @@ func TestReadExistingLogsWithHeader(t *testing.T) {
}

func TestDeleteAfterRead_SkipPartials(t *testing.T) {
bytesPerLine := 100
shortFileLine := tokenWithLength(bytesPerLine - 1)
shortFileLine := "short file line"
longFileLines := 100000
longFileFirstLine := "first line of long file\n"

require.NoError(t, featuregate.GlobalRegistry().Set(allowFileDeletion.ID(), true))
defer func() {
Expand All @@ -1513,15 +1511,13 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
operator.persister = testutil.NewUnscopedMockPersister()

shortFile := openTemp(t, tempDir)
_, err := shortFile.WriteString(string(shortFileLine) + "\n")
_, err := shortFile.WriteString(shortFileLine + "\n")
require.NoError(t, err)
require.NoError(t, shortFile.Close())

longFile := openTemp(t, tempDir)
_, err = longFile.WriteString(longFileFirstLine)
require.NoError(t, err)
for line := 0; line < longFileLines; line++ {
_, err := longFile.WriteString(string(tokenWithLength(bytesPerLine-1)) + "\n")
_, err := longFile.WriteString(string(tokenWithLength(100)) + "\n")
require.NoError(t, err)
}
require.NoError(t, longFile.Close())
Expand All @@ -1541,22 +1537,26 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
operator.poll(ctx)
}()

for !(shortOne && longOne) {
if line := waitForEmit(t, emitCalls); string(line.token) == string(shortFileLine) {
for !shortOne || !longOne {
if line := waitForEmit(t, emitCalls); string(line.token) == shortFileLine {
shortOne = true
} else {
longOne = true
}
}

// Short file was fully consumed and should eventually be deleted.
// Enforce assertion before canceling because EOF is not necessarily detected
// immediately when the token is emitted. An additional scan may be necessary.
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.NoFileExists(c, shortFile.Name())
}, 100*time.Millisecond, time.Millisecond)

// Stop consuming before long file has been fully consumed
cancel()
wg.Wait()

// short file was fully consumed and should have been deleted
require.NoFileExists(t, shortFile.Name())

// long file was partially consumed and should NOT have been deleted
// Long file was partially consumed and should NOT have been deleted.
require.FileExists(t, longFile.Name())
}

Expand Down
11 changes: 3 additions & 8 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
s := scanner.New(r, r.MaxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc)

// Iterate over the tokenized file, emitting entries as we go
var eof bool
for {
select {
case <-ctx.Done():
Expand All @@ -92,10 +91,10 @@ func (r *Reader) ReadToEnd(ctx context.Context) {

ok := s.Scan()
if !ok {
if err := s.Error(); err == nil {
eof = true
} else {
if err := s.Error(); err != nil {
r.logger.Errorw("Failed during scan", zap.Error(err))
} else if r.DeleteAtEOF {
r.delete()
}
break
}
Expand All @@ -122,12 +121,8 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
r.logger.Errorw("process: %w", zap.Error(err))
}
}

r.Offset = s.Pos()
}
if eof && r.DeleteAtEOF {
r.delete()
}
}

func (r *Reader) finalizeHeader() {
Expand Down

0 comments on commit 03ad502

Please sign in to comment.