Skip to content

Commit

Permalink
[pkg/stanza] Skip rotated file handles if truncated (#27064)
Browse files Browse the repository at this point in the history
Fixes
#27037
  • Loading branch information
djaglowski committed Sep 22, 2023
1 parent 52f81a8 commit 844f49b
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 16 deletions.
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-lost-truncate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where truncated file could be read incorrectly.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27037]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
headerConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(),
roller: newRoller(int(c.FingerprintSize)),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
Expand Down
6 changes: 3 additions & 3 deletions pkg/stanza/fileconsumer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type reader struct {
*readerConfig
*readerMetadata
file *os.File
fileName string
lineSplitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
decoder *decode.Decoder
Expand Down Expand Up @@ -129,10 +130,9 @@ func (r *reader) Delete() {
if r.file == nil {
return
}
f := r.file
r.Close()
if err := os.Remove(f.Name()); err != nil {
r.Errorf("could not delete %s", f.Name())
if err := os.Remove(r.fileName); err != nil {
r.Errorf("could not delete %s", r.fileName)
}
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/stanza/fileconsumer/reader_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (f *readerFactory) build(file *os.File, m *readerMetadata, lineSplitFunc bu
readerConfig: f.readerConfig,
readerMetadata: m,
file: file,
fileName: file.Name(),
SugaredLogger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.encoding),
lineSplitFunc: lineSplitFunc,
Expand All @@ -82,12 +83,12 @@ func (f *readerFactory) build(file *os.File, m *readerMetadata, lineSplitFunc bu
}

// Resolve file name and path attributes
resolved := file.Name()
resolved := r.fileName

// Dirty solution, waiting for this permanent fix https://github.com/golang/go/issues/39786
// EvalSymlinks on windows is partially working depending on the way you use Symlinks and Junctions
if runtime.GOOS != "windows" {
resolved, err = filepath.EvalSymlinks(file.Name())
resolved, err = filepath.EvalSymlinks(r.fileName)
if err != nil {
f.Errorf("resolve symlinks: %w", err)
}
Expand All @@ -98,12 +99,12 @@ func (f *readerFactory) build(file *os.File, m *readerMetadata, lineSplitFunc bu
}

if f.readerConfig.includeFileName {
r.FileAttributes[logFileName] = filepath.Base(file.Name())
r.FileAttributes[logFileName] = filepath.Base(r.fileName)
} else if r.FileAttributes[logFileName] != nil {
delete(r.FileAttributes, logFileName)
}
if f.readerConfig.includeFilePath {
r.FileAttributes[logFilePath] = file.Name()
r.FileAttributes[logFilePath] = r.fileName
} else if r.FileAttributes[logFilePath] != nil {
delete(r.FileAttributes, logFilePath)
}
Expand Down
27 changes: 24 additions & 3 deletions pkg/stanza/fileconsumer/roller_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto
import (
"context"
"sync"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
)

type detectLostFiles struct {
oldReaders []*reader
oldReaders []*reader
fingerprintSize int // used when we check for truncation
}

func newRoller() roller {
return &detectLostFiles{[]*reader{}}
func newRoller(fingerprintSize int) roller {
return &detectLostFiles{
oldReaders: []*reader{},
fingerprintSize: fingerprintSize,
}
}

func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader) {
Expand All @@ -28,6 +34,21 @@ OUTER:
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
if oldReader.fileName == newReader.fileName {
// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader.

// We can detect truncation by recreating a fingerprint from the old handle.
// If it matches the old fingerprint, then we know that the file was moved,
// so we can consider the file lost and continue reading from the old handle.
// If there's an error reading a new fingerprint from the old handle, let's assume we can't
// read the rest of it anyways.
refreshedFingerprint, err := fingerprint.New(oldReader.file, r.fingerprintSize)
if err == nil && !refreshedFingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
}
}
lostReaders = append(lostReaders, oldReader)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/roller_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import "context"

type closeImmediately struct{}

func newRoller() roller {
func newRoller(_ int) roller {
return &closeImmediately{}
}

Expand Down
84 changes: 80 additions & 4 deletions pkg/stanza/fileconsumer/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,10 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) {
originalFile.Close()

newDir := fmt.Sprintf("%s%s", tempDir[:len(tempDir)-1], "_new/")
err := os.Mkdir(newDir, 0777)
require.NoError(t, err)
require.NoError(t, os.Mkdir(newDir, 0777))
movedFileName := fmt.Sprintf("%s%s", newDir, "newfile.log")

err = os.Rename(orginalName, movedFileName)
require.NoError(t, err)
require.NoError(t, os.Rename(orginalName, movedFileName))

newFile, err := os.OpenFile(orginalName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
Expand All @@ -463,6 +461,84 @@ func TestTrackRotatedFilesLogOrder(t *testing.T) {
waitForTokens(t, emitCalls, [][]byte{[]byte("testlog2"), []byte("testlog3")})
}

// When a file it rotated out of pattern via move/create, we should
// detect that our old handle is still valid attempt to read from it.
func TestRotatedOutOfPatternMoveCreate(t *testing.T) {
if runtime.GOOS == windowsOS {
t.Skip("Moving files while open is unsupported on Windows")
}
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig()
cfg.Include = append(cfg.Include, fmt.Sprintf("%s/*.log1", tempDir))
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

originalFile := openTempWithPattern(t, tempDir, "*.log1")
originalFileName := originalFile.Name()

writeString(t, originalFile, "testlog1\n")
operator.poll(context.Background())
waitForToken(t, emitCalls, []byte("testlog1"))

// write more log, before next poll() begins
writeString(t, originalFile, "testlog2\n")

// move the file so it no longer matches
require.NoError(t, originalFile.Close())
require.NoError(t, os.Rename(originalFileName, originalFileName+".old"))

newFile := openFile(t, originalFileName)
_, err := newFile.Write([]byte("testlog4\ntestlog5\n"))
require.NoError(t, err)

// poll again
operator.poll(context.Background())

// expect remaining log from old file as well as all from new file
waitForTokens(t, emitCalls, [][]byte{[]byte("testlog2"), []byte("testlog4"), []byte("testlog5")})
}

// When a file it rotated out of pattern via copy/truncate, we should
// detect that our old handle is stale and not attempt to read from it.
func TestRotatedOutOfPatternCopyTruncate(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig()
cfg.Include = append(cfg.Include, fmt.Sprintf("%s/*.log1", tempDir))
cfg.StartAt = "beginning"
operator, emitCalls := buildTestManager(t, cfg)
operator.persister = testutil.NewMockPersister("test")

originalFile := openTempWithPattern(t, tempDir, "*.log1")
writeString(t, originalFile, "testlog1\n")
operator.poll(context.Background())
waitForToken(t, emitCalls, []byte("testlog1"))

// write more log, before next poll() begins
writeString(t, originalFile, "testlog2\n")
// copy the file to another file i.e. rotate, out of pattern
newFile := openTempWithPattern(t, tempDir, "*.log2")
_, err := originalFile.Seek(0, 0)
require.NoError(t, err)
_, err = io.Copy(newFile, originalFile)
require.NoError(t, err)

_, err = originalFile.Seek(0, 0)
require.NoError(t, err)
require.NoError(t, originalFile.Truncate(0))
_, err = originalFile.Write([]byte("testlog4\ntestlog5\n"))
require.NoError(t, err)

// poll again
operator.poll(context.Background())

waitForTokens(t, emitCalls, [][]byte{[]byte("testlog4"), []byte("testlog5")})
}

// TruncateThenWrite tests that, after a file has been truncated,
// any new writes are picked up
func TestTruncateThenWrite(t *testing.T) {
Expand Down

0 comments on commit 844f49b

Please sign in to comment.