Skip to content

Commit

Permalink
[chore][pkg/stanza] Simplify delete_after_read handling of deleted fi…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 4, 2023
1 parent de4b2b6 commit f0c2cf4
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 25 deletions.
16 changes: 0 additions & 16 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,6 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

// Save off any files that were not fully read
if m.deleteAfterRead {
unfinished := make([]*reader.Reader, 0, len(readers))
for _, r := range readers {
if !r.EOF {
unfinished = append(unfinished, r)
}
}
readers = unfinished

// If all files were read and deleted then no need to do bookkeeping on readers
if len(readers) == 0 {
return
}
}

// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true

Expand Down
10 changes: 1 addition & 9 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package fileconsumer

import (
"bytes"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1384,7 +1383,7 @@ func TestDeleteAfterRead(t *testing.T) {
cfg.DeleteAfterRead = true
emitCalls := make(chan *emitParams, totalLines)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))

operator.persister = testutil.NewMockPersister("test")
operator.poll(context.Background())
actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, totalLines)...)

Expand Down Expand Up @@ -1518,7 +1517,6 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
bytesPerLine := 100
shortFileLine := tokenWithLength(bytesPerLine - 1)
longFileLines := 100000
longFileSize := longFileLines * bytesPerLine
longFileFirstLine := "first line of long file\n"

require.NoError(t, featuregate.GlobalRegistry().Set(allowFileDeletion.ID(), true))
Expand Down Expand Up @@ -1580,12 +1578,6 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {

// long file was partially consumed and should NOT have been deleted
require.FileExists(t, longFile.Name())

// Verify that only long file is remembered and that (0 < offset < fileSize)
require.Equal(t, 1, len(operator.knownFiles))
reader := operator.knownFiles[0]
require.True(t, bytes.HasPrefix(reader.Fingerprint.FirstBytes, []byte(longFileFirstLine)))
require.Less(t, reader.Offset, int64(longFileSize))
}

func TestHeaderPersistance(t *testing.T) {
Expand Down

0 comments on commit f0c2cf4

Please sign in to comment.