Skip to content

Commit

Permalink
[pkg/stanza] Fix issue where files were unnecessarily kept open on wi…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Nov 28, 2023
1 parent ab2c422 commit b8100ca
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 10 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fileconsumer-fix-windows.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where files were unnecessarily kept open on Windows

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29149]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 3 additions & 6 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (m *Manager) closePreviousFiles() {
for _, r := range m.previousPollFiles {
m.knownFiles = append(m.knownFiles, r.Close())
}
m.previousPollFiles = nil
}

// Stop will stop the file monitoring process
Expand Down Expand Up @@ -159,11 +160,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files", zap.Strings("paths", paths))
readers := m.makeReaders(paths)

// take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
m.readLostFiles(ctx, readers)
m.closePreviousFiles()
m.preConsume(ctx, readers)

// read new readers to end
var wg sync.WaitGroup
Expand All @@ -176,7 +173,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.previousPollFiles = readers
m.postConsume(readers)
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down
13 changes: 11 additions & 2 deletions pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
// Detect files that have been rotated out of matching pattern
// Take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) {
lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles))
OUTER:
for _, oldReader := range m.previousPollFiles {
Expand Down Expand Up @@ -48,3 +50,10 @@ OUTER:
}
lostWG.Wait()
}

// On non-windows platforms, we keep files open between poll cycles so that we can detect
// and read "lost" files, which have been moved out of the matching pattern.
func (m *Manager) postConsume(readers []*reader.Reader) {
m.closePreviousFiles()
m.previousPollFiles = readers
}
25 changes: 24 additions & 1 deletion pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) {

operator, emitChan := buildTestManager(t, cfg)
operator.persister = testutil.NewUnscopedMockPersister()
operator.movingAverageMatches = 10

temps := make([]*os.File, 0, initFiles+moreFiles)
for i := 0; i < initFiles; i++ {
Expand Down Expand Up @@ -1662,7 +1663,10 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
waitForToken(t, emitCalls, []byte(content))
expectNoTokens(t, emitCalls)
operator.wg.Wait()
require.Len(t, operator.previousPollFiles, 1)
if runtime.GOOS != "windows" {
// On windows, we never keep files in previousPollFiles, so we don't expect to see them here
require.Len(t, operator.previousPollFiles, 1)
}

// keep append data to file1 and file2
newContent := "bbbbbbbbbbbb"
Expand All @@ -1675,3 +1679,22 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
waitForTokens(t, emitCalls, []byte(content), []byte(newContent1), []byte(newContent))
operator.wg.Wait()
}

func TestWindowsFilesClosedImmediately(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog\n")
require.NoError(t, temp.Close())

operator.poll(context.Background())
waitForToken(t, emitCalls, []byte("testlog"))

// On Windows, poll should close the file after reading it. We can test this by trying to move it.
require.NoError(t, os.Rename(temp.Name(), temp.Name()+"_renamed"))
}
8 changes: 7 additions & 1 deletion pkg/stanza/fileconsumer/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
func (m *Manager) preConsume(ctx context.Context, newReaders []*reader.Reader) {
return
}

// On windows, we close files immediately after reading becauase they cannot be moved while open.
func (m *Manager) postConsume(readers []*reader.Reader) {
m.previousPollFiles = readers
m.closePreviousFiles()
}

0 comments on commit b8100ca

Please sign in to comment.