Skip to content

Commit

Permalink
[chore][pkg/stanza] Clean up flush function (open-telemetry#27412)
Browse files Browse the repository at this point in the history
This PR simplifies the flush function. The behavior should be the same
but the representation is simpler.
  • Loading branch information
djaglowski committed Oct 4, 2023
1 parent fef6211 commit a690ff9
Showing 1 changed file with 33 additions and 58 deletions.
91 changes: 33 additions & 58 deletions pkg/stanza/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,84 +8,59 @@ import (
"time"
)

// Wrap a bufio.SplitFunc with a flusher
// WithPeriod wraps a bufio.SplitFunc with a timer.
// When the timer expires, an incomplete token may be returned.
// The timer will reset any time the data parameter changes.
func WithPeriod(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc {
if period <= 0 {
return splitFunc
}
f := &flusher{
lastDataChange: time.Now(),
forcePeriod: period,
previousDataLength: 0,
}
f := &flusher{lastDataChange: time.Now(), forcePeriod: period}
return f.splitFunc(splitFunc)
}

// flusher keeps information about flush state
type flusher struct {
// forcePeriod defines time from last flush which should pass before setting force to true.
// Never forces if forcePeriod is set to 0
forcePeriod time.Duration

// lastDataChange tracks date of last data change (including new data and flushes)
lastDataChange time.Time

// previousDataLength:
// if previousDataLength = 0 - no new data have been received after flush
// if previousDataLength > 0 - there is data which has not been flushed yet and it doesn't changed since lastDataChange
forcePeriod time.Duration
lastDataChange time.Time
previousDataLength int
}

func (f *flusher) updateDataChangeTime(length int) {
// Skip if length is greater than 0 and didn't changed
if length > 0 && length == f.previousDataLength {
return
}

// update internal properties with new values if data length changed
// because it means that data is flowing and being processed
f.previousDataLength = length
f.lastDataChange = time.Now()
}

// Flushed reset data length
func (f *flusher) flushed() {
f.updateDataChangeTime(0)
}

// ShouldFlush returns true if data should be forcefully flushed
func (f *flusher) shouldFlush() bool {
// Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0
return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0
}

func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)
return func(data []byte, atEOF bool) (int, []byte, error) {
advance, token, err := splitFunc(data, atEOF)

// Return as it is in case of error
// Don't interfere with errors
if err != nil {
return
return advance, token, err
}

// Return token
// If there's a token, return it
if token != nil {
// Inform flusher that we just flushed
f.flushed()
return
f.lastDataChange = time.Now()
f.previousDataLength = 0
return advance, token, err
}

// Can't flush something from nothing
if atEOF && len(data) == 0 {
f.previousDataLength = 0
return 0, nil, nil
}

// Flush timed out
if time.Since(f.lastDataChange) > f.forcePeriod {
f.lastDataChange = time.Now()
f.previousDataLength = 0
return len(data), data, nil
}

// If there is no token, force flush eventually
if f.shouldFlush() {
// Inform flusher that we just flushed
f.flushed()
token = data
advance = len(data)
return
// We're seeing new data so postpone the next flush
if len(data) > f.previousDataLength {
f.lastDataChange = time.Now()
f.previousDataLength = len(data)
}

// Inform flusher that we didn't flushed
f.updateDataChangeTime(len(data))
return
// Ask for more data
return 0, nil, nil
}
}

0 comments on commit a690ff9

Please sign in to comment.