Skip to content

Commit

Permalink
[pkg/stanza] Major overhaul of how we manage readers
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 20, 2023
1 parent 02cfaa0 commit 01f939b
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 163 deletions.
14 changes: 7 additions & 7 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
TrimFunc: trimFunc,
HeaderConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: make([]*reader.Metadata, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
}, nil
}

Expand Down
75 changes: 34 additions & 41 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type Manager struct {

readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

knownFiles []*reader.Reader
seenPaths map[string]struct{}
previousPollFiles []*reader.Reader
knownFiles []*reader.Metadata
seenPaths map[string]struct{}

currentFps []*fingerprint.Fingerprint
}
Expand All @@ -52,9 +52,7 @@ func (m *Manager) Start(persister operator.Persister) error {
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
m.knownFiles = append(m.knownFiles, offsets...)
}

if _, err := m.fileMatcher.MatchFiles(); err != nil {
Expand All @@ -71,10 +69,6 @@ func (m *Manager) Start(persister operator.Persister) error {
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
m.roller.cleanup()
for _, reader := range m.knownFiles {
reader.Close()
}
m.cancel = nil
return nil
}
Expand Down Expand Up @@ -144,8 +138,16 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
// take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
m.roller.readLostFiles(ctx, readers)
m.readLostFiles(ctx, readers)
forgetNum := len(m.knownFiles) + len(m.previousPollFiles) - cap(m.knownFiles)
if forgetNum > 0 {
m.knownFiles = m.knownFiles[forgetNum:]
}
for _, oldReader := range m.previousPollFiles {
m.knownFiles = append(m.knownFiles, oldReader.Close())
}

// read new readers to end
var wg sync.WaitGroup
for _, r := range readers {
wg.Add(1)
Expand All @@ -156,17 +158,18 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.roller.roll(ctx, readers)
m.saveCurrent(readers)

rmds := make([]*reader.Metadata, 0, len(readers))
// Checkpoint all known offsets. We don't want to add metadata to knownFiles
// until those files are closed, but we do want to save them in case of a restart.
rmds := make([]*reader.Metadata, 0, len(m.knownFiles)+len(readers))
rmds = append(rmds, m.knownFiles...)
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
}

m.previousPollFiles = readers
m.clearCurrentFingerprints()
}

Expand Down Expand Up @@ -244,38 +247,28 @@ func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*fingerprint.Fingerprint, 0)
}

// saveCurrent adds the readers from this polling interval to this list of
// known files, then increments the generation of all tracked old readers
// before clearing out readers that have existed for 3 generations.
func (m *Manager) saveCurrent(readers []*reader.Reader) {
forgetNum := len(m.knownFiles) + len(readers) - cap(m.knownFiles)
if forgetNum > 0 {
m.knownFiles = append(m.knownFiles[forgetNum:], readers...)
return
}
m.knownFiles = append(m.knownFiles, readers...)
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := m.findFingerprintMatch(fp); ok {
return m.readerFactory.Copy(oldReader, file)
// Check previous poll cycle for match
for i := 0; i < len(m.previousPollFiles); i++ {
oldReader := m.previousPollFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Keep the new reader and discard the old. This ensures that if the file was
// copied to another location and truncated, our handle is updated.
m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...)
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}
}

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Reader, bool) {
// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldReader := m.knownFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Remove the old reader from the list of known files. We will
// add it back in saveCurrent if it is still alive.
oldMetadata := m.knownFiles[i]
if fp.StartsWith(oldMetadata.Fingerprint) {
// Remove the old metadata from the list. We will keep updating it and save it again later.
m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...)
return oldReader, true
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
}
return nil, false

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.NewReader(file, fp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type detectLostFiles struct {
oldReaders []*reader.Reader
}

func newRoller() roller {
return &detectLostFiles{oldReaders: []*reader.Reader{}}
}

func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
// Detect files that have been rotated out of matching pattern
lostReaders := make([]*reader.Reader, 0, len(r.oldReaders))
lostReaders := make([]*reader.Reader, 0, len(m.previousPollFiles))

OUTER:
for _, oldReader := range r.oldReaders {
for i := 0; i < len(m.previousPollFiles); i++ {
oldReader := m.previousPollFiles[i]
for _, newReader := range newReaders {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
Expand All @@ -38,8 +32,8 @@ OUTER:
// At this point, we know that the file has been rotated. However, we do not know
// if it was moved or truncated. If truncated, then both handles point to the same
// file, in which case we should only read from it using the new reader. We can use
// the ValidateOrClose method to establish that the file has not been truncated.
if !oldReader.ValidateOrClose() {
// the Validate method to ensure the file has been truncated.
if !oldReader.Validate() {
continue OUTER
}
}
Expand All @@ -56,17 +50,3 @@ OUTER:
}
lostWG.Wait()
}

func (r *detectLostFiles) roll(_ context.Context, newReaders []*reader.Reader) {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}

r.oldReaders = newReaders
}

func (r *detectLostFiles) cleanup() {
for _, oldReader := range r.oldReaders {
oldReader.Close()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build windows
// +build windows

package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
Expand All @@ -9,8 +12,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type roller interface {
readLostFiles(context.Context, []*reader.Reader)
roll(context.Context, []*reader.Reader)
cleanup()
func (m *Manager) readLostFiles(ctx context.Context, newReaders []*reader.Reader) {
return
}
45 changes: 14 additions & 31 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)
Expand All @@ -32,43 +31,27 @@ type Factory struct {
TrimFunc trim.Func
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
}

func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
m := &Metadata{
Fingerprint: fp,
FileAttributes: map[string]any{},
}
m := &Metadata{Fingerprint: fp, FileAttributes: map[string]any{}}
if f.Config.FlushTimeout > 0 {
m.FlushState = &flush.State{LastDataChange: time.Now()}
}
return f.build(file, m)
}

// copy creates a deep copy of a reader
func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) {
return f.build(newFile, &Metadata{
Fingerprint: old.Fingerprint.Copy(),
Offset: old.Offset,
FileAttributes: util.MapCopy(old.FileAttributes),
HeaderFinalized: old.HeaderFinalized,
FlushState: &flush.State{
LastDataChange: old.FlushState.LastDataChange,
LastDataLength: old.FlushState.LastDataLength,
},
})
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
return f.NewReaderFromMetadata(file, m)
}

func (f *Factory) build(file *os.File, m *Metadata) (r *Reader, err error) {
func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
r = &Reader{
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
}

if m.FlushState == nil {
Expand Down
22 changes: 7 additions & 15 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand Down Expand Up @@ -147,8 +146,8 @@ func (r *Reader) delete() {
}
}

// Close will close the file
func (r *Reader) Close() {
// Close will close the file and return the metadata
func (r *Reader) Close() *Metadata {
if r.file != nil {
if err := r.file.Close(); err != nil {
r.logger.Debugw("Problem closing reader", zap.Error(err))
Expand All @@ -161,6 +160,9 @@ func (r *Reader) Close() {
r.logger.Errorw("Failed to stop header pipeline", zap.Error(err))
}
}
m := r.Metadata
r.Metadata = nil
return m
}

// Read from the file and update the fingerprint if necessary
Expand Down Expand Up @@ -196,27 +198,17 @@ func (r *Reader) NameEquals(other *Reader) bool {
return r.fileName == other.fileName
}

// ValidateOrClose returns true if the reader still has a valid file handle, false otherwise.
// If false is returned, the file handle should be considered closed.
//
// It may create a new fingerprint from the old file handle and compare it to the
// previously known fingerprint. If there has been a change to the fingerprint
// (other than appended data), the file is considered truncated. Consequently, the
// reader will automatically close the file and drop the handle.
func (r *Reader) ValidateOrClose() bool {
// Validate returns true if the reader still has a valid file handle, false otherwise.
func (r *Reader) Validate() bool {
if r.file == nil {
return false
}
refreshedFingerprint, err := fingerprint.New(r.file, r.FingerprintSize)
if err != nil {
r.logger.Debugw("Closing unreadable file", zap.Error(err), zap.String(attrs.LogFileName, r.fileName))
r.Close()
return false
}
if refreshedFingerprint.StartsWith(r.Fingerprint) {
return true
}
r.logger.Debugw("Closing truncated file", zap.String(attrs.LogFileName, r.fileName))
r.Close()
return false
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestPersistFlusher(t *testing.T) {
expectNoTokensUntil(t, emitChan, 2*flushPeriod)

// A copy of the reader should remember that we last emitted about 200ms ago.
copyReader, err := f.Copy(r, temp)
copyReader, err := f.NewReaderFromMetadata(temp, r.Metadata)
assert.NoError(t, err)

// This time, the flusher will kick in and we should emit the unfinished log.
Expand Down
Loading

0 comments on commit 01f939b

Please sign in to comment.