diff --git a/.chloggen/pkg-stanza-metadata-flush.yaml b/.chloggen/pkg-stanza-metadata-flush.yaml new file mode 100755 index 0000000000000..b44a8f16561e8 --- /dev/null +++ b/.chloggen/pkg-stanza-metadata-flush.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate 'flush.WithPeriod'. Use 'flush.WithFunc' instead. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27843] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/benchmark_test.go b/pkg/stanza/fileconsumer/benchmark_test.go index a373129aa4c7d..0d6cbd9335f8d 100644 --- a/pkg/stanza/fileconsumer/benchmark_test.go +++ b/pkg/stanza/fileconsumer/benchmark_test.go @@ -127,6 +127,20 @@ func BenchmarkFileInput(b *testing.B) { return cfg }, }, + { + name: "NoFlush", + paths: []string{ + "file0.log", + }, + config: func() *Config { + cfg := NewConfig() + cfg.Include = []string{ + "file*.log", + } + cfg.FlushPeriod = 0 + return cfg + }, + }, } for _, bench := range cases { diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index de4a9a9a5377a..03504a0dbcd22 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -18,7 +18,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -113,9 +112,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, trimFunc = c.TrimConfig.Func() } - // Ensure that splitter is buildable - factory := splitter.NewFactory(splitFunc, trimFunc, c.FlushPeriod, int(c.MaxLogSize)) - return c.buildManager(logger, emit, factory) + return c.buildManager(logger, emit, splitFunc, trimFunc) } // BuildWithSplitFunc will build a file input operator with customized splitFunc function @@ -123,13 +120,10 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback if err := c.validate(); err != nil { return nil, err } - - // Ensure that splitter is buildable - factory := splitter.NewFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod, int(c.MaxLogSize)) - return c.buildManager(logger, emit, factory) + return c.buildManager(logger, emit, splitFunc, c.TrimConfig.Func()) } -func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, factory splitter.Factory) (*Manager, error) { +func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, splitFunc bufio.SplitFunc, trimFunc trim.Func) (*Manager, error) { if emit == nil { return nil, fmt.Errorf("must provide emit function") } @@ -175,11 +169,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact IncludeFileNameResolved: c.IncludeFileNameResolved, IncludeFilePathResolved: c.IncludeFilePathResolved, DeleteAtEOF: c.DeleteAfterRead, + FlushTimeout: c.FlushPeriod, }, - FromBeginning: startAtBeginning, - SplitterFactory: factory, - Encoding: enc, - HeaderConfig: hCfg, + FromBeginning: startAtBeginning, + Encoding: enc, + SplitFunc: splitFunc, + TrimFunc: trimFunc, + HeaderConfig: hCfg, }, fileMatcher: fileMatcher, roller: newRoller(), diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index e93489f9c902b..0df8e8a945b5f 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "runtime" + "time" "go.uber.org/zap" "golang.org/x/text/encoding" @@ -16,53 +17,65 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) type Factory struct { *zap.SugaredLogger - Config *Config - FromBeginning bool - SplitterFactory splitter.Factory - Encoding encoding.Encoding - HeaderConfig *header.Config + Config *Config + FromBeginning bool + Encoding encoding.Encoding + HeaderConfig *header.Config + SplitFunc bufio.SplitFunc + TrimFunc trim.Func } func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) { - return f.build(file, &Metadata{ + m := &Metadata{ Fingerprint: fp, FileAttributes: map[string]any{}, - }, f.SplitterFactory.SplitFunc()) + } + if f.Config.FlushTimeout > 0 { + m.FlushState = &flush.State{LastDataChange: time.Now()} + } + return f.build(file, m) } // copy creates a deep copy of a reader func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) { - lineSplitFunc := old.lineSplitFunc - if lineSplitFunc == nil { - lineSplitFunc = f.SplitterFactory.SplitFunc() - } return f.build(newFile, &Metadata{ Fingerprint: old.Fingerprint.Copy(), Offset: old.Offset, FileAttributes: util.MapCopy(old.FileAttributes), HeaderFinalized: old.HeaderFinalized, - }, lineSplitFunc) + FlushState: &flush.State{ + LastDataChange: old.FlushState.LastDataChange, + LastDataLength: old.FlushState.LastDataLength, + }, + }) } func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { return fingerprint.New(file, f.Config.FingerprintSize) } -func (f *Factory) build(file *os.File, m *Metadata, lineSplitFunc bufio.SplitFunc) (r *Reader, err error) { +func (f *Factory) build(file *os.File, m *Metadata) (r *Reader, err error) { r = &Reader{ - Config: f.Config, - Metadata: m, - file: file, - fileName: file.Name(), - logger: f.SugaredLogger.With("path", file.Name()), - decoder: decode.New(f.Encoding), - lineSplitFunc: lineSplitFunc, + Config: f.Config, + Metadata: m, + file: file, + fileName: file.Name(), + logger: f.SugaredLogger.With("path", file.Name()), + decoder: decode.New(f.Encoding), + } + + if m.FlushState == nil { + r.lineSplitFunc = trim.WithFunc(trim.ToLength(f.SplitFunc, f.Config.MaxLogSize), f.TrimFunc) + } else { + flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout) + r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.Config.MaxLogSize), f.TrimFunc) } if !f.FromBeginning { diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 76bca6882e9f1..08d83d7f999fd 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "os" + "time" "go.uber.org/zap" @@ -18,6 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" ) type Config struct { @@ -29,6 +31,7 @@ type Config struct { IncludeFileNameResolved bool IncludeFilePathResolved bool DeleteAtEOF bool + FlushTimeout time.Duration } type Metadata struct { @@ -36,6 +39,7 @@ type Metadata struct { Offset int64 FileAttributes map[string]any HeaderFinalized bool + FlushState *flush.State } // Reader manages a single file diff --git a/pkg/stanza/fileconsumer/internal/splitter/factory.go b/pkg/stanza/fileconsumer/internal/splitter/factory.go index 7679cf305abb2..2c9d7c64b7564 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/factory.go +++ b/pkg/stanza/fileconsumer/internal/splitter/factory.go @@ -5,43 +5,19 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bufio" - "time" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -type Factory interface { - SplitFunc() bufio.SplitFunc -} - -type factory struct { - splitFunc bufio.SplitFunc - trimFunc trim.Func - flushPeriod time.Duration - maxLength int -} - -var _ Factory = (*factory)(nil) - -func NewFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration, maxLength int) Factory { - return &factory{ - splitFunc: splitFunc, - trimFunc: trimFunc, - flushPeriod: flushPeriod, - maxLength: maxLength, - } -} - -// SplitFunc builds a bufio.SplitFunc based on the configuration -func (f *factory) SplitFunc() bufio.SplitFunc { - // First apply the base splitFunc. - // If no token is found, we may still flush one based on timing. - // If a token is emitted for any reason, we must then apply trim rules. - // We must trim to max length _before_ trimming whitespace because otherwise we - // cannot properly keep track of the number of bytes to advance. - // For instance, if we have advance: 5, token: []byte(" foo "): - // Trimming whitespace first would result in advance: 5, token: []byte("foo") - // Then if we trim to max length of 2, we don't know whether or not to reduce advance. - return trim.WithFunc(trim.ToLength(flush.WithPeriod(f.splitFunc, f.flushPeriod), f.maxLength), f.trimFunc) +// Func builds a bufio.SplitFunc based on the configuration +// First apply the base splitFunc. +// If a token is emitted for any reason, we must then apply trim rules. +// We must trim to max length _before_ trimming whitespace because otherwise we +// cannot properly keep track of the number of bytes to advance. +// For instance, if we have advance: 5, token: []byte(" foo "): +// +// Trimming whitespace first would result in advance: 5, token: []byte("foo") +// Then if we trim to max length of 2, we don't know whether or not to reduce advance. +func Func(splitFunc bufio.SplitFunc, maxLength int, trimFunc trim.Func) bufio.SplitFunc { + return trim.WithFunc(trim.ToLength(splitFunc, maxLength), trimFunc) } diff --git a/pkg/stanza/fileconsumer/internal/splitter/factory_test.go b/pkg/stanza/fileconsumer/internal/splitter/factory_test.go index dbf5226cfd96e..83628cff78fac 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/factory_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/factory_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split/splittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) @@ -134,7 +135,13 @@ func TestFactorySplitFunc(t *testing.T) { } for _, tc := range testCases { - factory := NewFactory(tc.baseFunc, tc.trimFunc, tc.flushPeriod, tc.maxLength) - t.Run(tc.name, splittest.New(factory.SplitFunc(), tc.input, tc.steps...)) + var splitFunc bufio.SplitFunc + if tc.flushPeriod > 0 { + s := &flush.State{LastDataChange: time.Now()} + splitFunc = Func(s.Func(tc.baseFunc, tc.flushPeriod), tc.maxLength, tc.trimFunc) + } else { + splitFunc = Func(tc.baseFunc, tc.maxLength, tc.trimFunc) + } + t.Run(tc.name, splittest.New(splitFunc, tc.input, tc.steps...)) } } diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 98896b32409dd..ec3672123653d 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -15,7 +15,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" @@ -237,10 +236,12 @@ func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPer FingerprintSize: fingerprint.DefaultSize, MaxLogSize: maxLogSize, Emit: testEmitFunc(emitChan), + FlushTimeout: flushPeriod, }, - FromBeginning: true, - SplitterFactory: splitter.NewFactory(splitFunc, trim.Whitespace, flushPeriod, maxLogSize), - Encoding: enc, + FromBeginning: true, + Encoding: enc, + SplitFunc: splitFunc, + TrimFunc: trim.Whitespace, }, emitChan } diff --git a/pkg/stanza/flush/flush.go b/pkg/stanza/flush/flush.go index e60b6e18e3a84..9a7291dc19b9f 100644 --- a/pkg/stanza/flush/flush.go +++ b/pkg/stanza/flush/flush.go @@ -8,24 +8,19 @@ import ( "time" ) -// WithPeriod wraps a bufio.SplitFunc with a timer. +type State struct { + LastDataChange time.Time + LastDataLength int +} + +// Func wraps a bufio.SplitFunc with a timer. // When the timer expires, an incomplete token may be returned. // The timer will reset any time the data parameter changes. -func WithPeriod(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc { +func (s *State) Func(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc { if period <= 0 { return splitFunc } - f := &flusher{lastDataChange: time.Now(), forcePeriod: period} - return f.splitFunc(splitFunc) -} -type flusher struct { - forcePeriod time.Duration - lastDataChange time.Time - previousDataLength int -} - -func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { return func(data []byte, atEOF bool) (int, []byte, error) { advance, token, err := splitFunc(data, atEOF) @@ -36,31 +31,37 @@ func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { // If there's a token, return it if token != nil { - f.lastDataChange = time.Now() - f.previousDataLength = 0 + s.LastDataChange = time.Now() + s.LastDataLength = 0 return advance, token, err } // Can't flush something from nothing if atEOF && len(data) == 0 { - f.previousDataLength = 0 + s.LastDataLength = 0 return 0, nil, nil } // Flush timed out - if time.Since(f.lastDataChange) > f.forcePeriod { - f.lastDataChange = time.Now() - f.previousDataLength = 0 + if time.Since(s.LastDataChange) > period { + s.LastDataChange = time.Now() + s.LastDataLength = 0 return len(data), data, nil } // We're seeing new data so postpone the next flush - if len(data) > f.previousDataLength { - f.lastDataChange = time.Now() - f.previousDataLength = len(data) + if len(data) > s.LastDataLength { + s.LastDataChange = time.Now() + s.LastDataLength = len(data) } // Ask for more data return 0, nil, nil } } + +// Deprecated: [v0.88.0] Use WithFunc instead. +func WithPeriod(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc { + s := &State{LastDataChange: time.Now()} + return s.Func(splitFunc, period) +} diff --git a/pkg/stanza/flush/flush_test.go b/pkg/stanza/flush/flush_test.go index 97bcbb2930d0d..e8bb369ac77c2 100644 --- a/pkg/stanza/flush/flush_test.go +++ b/pkg/stanza/flush/flush_test.go @@ -41,7 +41,9 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc := WithPeriod(tc.baseFunc, tc.flushPeriod) - t.Run(tc.name, splittest.New(splitFunc, tc.input, tc.steps...)) + t.Run(tc.name+"/WithPeriod", splittest.New(WithPeriod(tc.baseFunc, tc.flushPeriod), tc.input, tc.steps...)) + + previousState := &State{LastDataChange: time.Now()} + t.Run(tc.name+"/Func", splittest.New(previousState.Func(tc.baseFunc, tc.flushPeriod), tc.input, tc.steps...)) } } diff --git a/pkg/stanza/split/splittest/splittest.go b/pkg/stanza/split/splittest/splittest.go index e47d7c4297003..8be0fdcaeb594 100644 --- a/pkg/stanza/split/splittest/splittest.go +++ b/pkg/stanza/split/splittest/splittest.go @@ -12,15 +12,13 @@ import ( ) type Step struct { - waitAtEOF func() bool - validate func(t *testing.T, advance int, token []byte, err error) + tick time.Duration + timeout time.Duration + validate func(t *testing.T, advance int, token []byte, err error) } -var noWait = func() bool { return false } - func ExpectReadMore() Step { return Step{ - waitAtEOF: noWait, validate: func(t *testing.T, advance int, token []byte, err error) { assert.True(t, needMoreData(advance, token, err)) }, @@ -33,7 +31,6 @@ func ExpectToken(expectToken string) Step { func ExpectAdvanceToken(expectAdvance int, expectToken string) Step { return Step{ - waitAtEOF: noWait, validate: func(t *testing.T, advance int, token []byte, err error) { assert.Equal(t, expectAdvance, advance) assert.Equal(t, []byte(expectToken), token) @@ -44,7 +41,6 @@ func ExpectAdvanceToken(expectAdvance int, expectToken string) Step { func ExpectAdvanceNil(expectAdvance int) Step { return Step{ - waitAtEOF: noWait, validate: func(t *testing.T, advance int, token []byte, err error) { assert.Equal(t, expectAdvance, advance) assert.Equal(t, []byte(nil), token) @@ -55,7 +51,6 @@ func ExpectAdvanceNil(expectAdvance int) Step { func ExpectError(expectErr string) Step { return Step{ - waitAtEOF: noWait, validate: func(t *testing.T, advance int, token []byte, err error) { assert.EqualError(t, err, expectErr) }, @@ -63,12 +58,8 @@ func ExpectError(expectErr string) Step { } func Eventually(step Step, maxTime time.Duration, tick time.Duration) Step { - var waited time.Duration - step.waitAtEOF = func() bool { - time.Sleep(tick) - waited += tick - return waited < maxTime - } + step.tick = tick + step.timeout = maxTime return step } @@ -86,7 +77,8 @@ func New(splitFunc bufio.SplitFunc, input []byte, steps ...Step) func(*testing.T var token []byte var err error - for needMoreData(advance, token, err) && (!atEOF || step.waitAtEOF()) { + var waited time.Duration + for needMoreData(advance, token, err) { // Grow the buffer at a slow pace to ensure that we're // exercising the split func's ability to ask for more data. bufferSize = 1 + bufferSize + bufferSize/8 @@ -99,6 +91,14 @@ func New(splitFunc bufio.SplitFunc, input []byte, steps ...Step) func(*testing.T } advance, token, err = splitFunc(data, atEOF) // t.Errorf("\nbuffer: %d, advance: %d, token: %q, err: %v", bufferSize, advance, token, err) + + if atEOF { + if waited >= step.timeout { + break + } + time.Sleep(step.tick) + waited += step.tick + } } offset += advance step.validate(t, advance, token, err)