Skip to content

Commit

Permalink
[chore][pkg/stanza] Move truncated file detection into reader (#27418)
Browse files Browse the repository at this point in the history
This reworks a recent addition (See #27064) to how we handle rotated
files.

The solution depends upon re-reading a file's fingerprint. This is
unchanged, but most of the implementation is now localized to the reader
struct.
  • Loading branch information
djaglowski committed Oct 3, 2023
1 parent c63cd37 commit 49d4504
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
headerConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(int(c.FingerprintSize)),
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
Expand Down
20 changes: 20 additions & 0 deletions pkg/stanza/fileconsumer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,23 @@ func min0(a, b int) int {
}
return b
}

// validateFingerprint checks whether or not the reader still has a valid file handle.
//
// It creates a new fingerprint from the old file handle and compares it to the
// previously known fingerprint. If there has been a change to the fingerprint
// (other than appended data), the file is considered invalid. Consequently, the
// reader will automatically close the file and drop the handle.
//
// The function returns true if the file handle is still valid, false otherwise.
func (r *reader) validateFingerprint() bool {
if r.file == nil {
return false
}
refreshedFingerprint, err := fingerprint.New(r.file, r.fingerprintSize)
if err != nil {
r.Debugw("Failed to create fingerprint", zap.Error(err))
return false
}
return refreshedFingerprint.StartsWith(r.Fingerprint)
}
24 changes: 6 additions & 18 deletions pkg/stanza/fileconsumer/roller_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,14 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto
import (
"context"
"sync"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
)

type detectLostFiles struct {
oldReaders []*reader
fingerprintSize int // used when we check for truncation
oldReaders []*reader
}

func newRoller(fingerprintSize int) roller {
return &detectLostFiles{
oldReaders: []*reader{},
fingerprintSize: fingerprintSize,
}
func newRoller() roller {
return &detectLostFiles{oldReaders: []*reader{}}
}

func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader) {
Expand All @@ -37,15 +31,9 @@ OUTER:
if oldReader.fileName == newReader.fileName {
// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader.

// We can detect truncation by recreating a fingerprint from the old handle.
// If it matches the old fingerprint, then we know that the file was moved,
// so we can consider the file lost and continue reading from the old handle.
// If there's an error reading a new fingerprint from the old handle, let's assume we can't
// read the rest of it anyways.
refreshedFingerprint, err := fingerprint.New(oldReader.file, r.fingerprintSize)
if err == nil && !refreshedFingerprint.StartsWith(oldReader.Fingerprint) {
// file, in which case we should only read from it using the new reader. We can use
// the validateFingerprint method to establish that the file has not been truncated.
if !oldReader.validateFingerprint() {
continue OUTER
}
}
Expand Down

0 comments on commit 49d4504

Please sign in to comment.