From a690ff9b3c3fb6583e8565c28b24b230c6c8345f Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 4 Oct 2023 09:04:49 -0600 Subject: [PATCH] [chore][pkg/stanza] Clean up flush function (#27412) This PR simplifies the flush function. The behavior should be the same but the representation is simpler. --- pkg/stanza/flush/flush.go | 91 ++++++++++++++------------------------- 1 file changed, 33 insertions(+), 58 deletions(-) diff --git a/pkg/stanza/flush/flush.go b/pkg/stanza/flush/flush.go index 4197f527972d5..e60b6e18e3a84 100644 --- a/pkg/stanza/flush/flush.go +++ b/pkg/stanza/flush/flush.go @@ -8,84 +8,59 @@ import ( "time" ) -// Wrap a bufio.SplitFunc with a flusher +// WithPeriod wraps a bufio.SplitFunc with a timer. +// When the timer expires, an incomplete token may be returned. +// The timer will reset any time the data parameter changes. func WithPeriod(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc { if period <= 0 { return splitFunc } - f := &flusher{ - lastDataChange: time.Now(), - forcePeriod: period, - previousDataLength: 0, - } + f := &flusher{lastDataChange: time.Now(), forcePeriod: period} return f.splitFunc(splitFunc) } -// flusher keeps information about flush state type flusher struct { - // forcePeriod defines time from last flush which should pass before setting force to true. - // Never forces if forcePeriod is set to 0 - forcePeriod time.Duration - - // lastDataChange tracks date of last data change (including new data and flushes) - lastDataChange time.Time - - // previousDataLength: - // if previousDataLength = 0 - no new data have been received after flush - // if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange + forcePeriod time.Duration + lastDataChange time.Time previousDataLength int } -func (f *flusher) updateDataChangeTime(length int) { - // Skip if length is greater than 0 and didn't changed - if length > 0 && length == f.previousDataLength { - return - } - - // update internal properties with new values if data length changed - // because it means that data is flowing and being processed - f.previousDataLength = length - f.lastDataChange = time.Now() -} - -// Flushed reset data length -func (f *flusher) flushed() { - f.updateDataChangeTime(0) -} - -// ShouldFlush returns true if data should be forcefully flushed -func (f *flusher) shouldFlush() bool { - // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 - return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 -} - func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - advance, token, err = splitFunc(data, atEOF) + return func(data []byte, atEOF bool) (int, []byte, error) { + advance, token, err := splitFunc(data, atEOF) - // Return as it is in case of error + // Don't interfere with errors if err != nil { - return + return advance, token, err } - // Return token + // If there's a token, return it if token != nil { - // Inform flusher that we just flushed - f.flushed() - return + f.lastDataChange = time.Now() + f.previousDataLength = 0 + return advance, token, err + } + + // Can't flush something from nothing + if atEOF && len(data) == 0 { + f.previousDataLength = 0 + return 0, nil, nil + } + + // Flush timed out + if time.Since(f.lastDataChange) > f.forcePeriod { + f.lastDataChange = time.Now() + f.previousDataLength = 0 + return len(data), data, nil } - // If there is no token, force flush eventually - if f.shouldFlush() { - // Inform flusher that we just flushed - f.flushed() - token = data - advance = len(data) - return + // We're seeing new data so postpone the next flush + if len(data) > f.previousDataLength { + f.lastDataChange = time.Now() + f.previousDataLength = len(data) } - // Inform flusher that we didn't flushed - f.updateDataChangeTime(len(data)) - return + // Ask for more data + return 0, nil, nil } }