diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 2b29d9ddbe749..9206479f3e0b1 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -89,8 +89,8 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, } // Ensure that splitter is buildable - factory := splitter.NewMultilineFactory(c.Splitter) - if _, err := factory.Build(int(c.MaxLogSize)); err != nil { + factory := splitter.NewMultilineFactory(c.Splitter, int(c.MaxLogSize)) + if _, err := factory.Build(); err != nil { return nil, err } @@ -109,7 +109,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback // Ensure that splitter is buildable factory := splitter.NewCustomFactory(c.Splitter.Flusher, splitFunc) - if _, err := factory.Build(int(c.MaxLogSize)); err != nil { + if _, err := factory.Build(); err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/internal/header/config.go b/pkg/stanza/fileconsumer/internal/header/config.go index 5645607640bd6..9316c734dd515 100644 --- a/pkg/stanza/fileconsumer/internal/header/config.go +++ b/pkg/stanza/fileconsumer/internal/header/config.go @@ -69,7 +69,7 @@ func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encod return nil, fmt.Errorf("failed to compile `pattern`: %w", err) } - splitFunc, err := tokenize.NewNewlineSplitFunc(enc, false, func(b []byte) []byte { + splitFunc, err := tokenize.NewlineSplitFunc(enc, false, func(b []byte) []byte { return bytes.Trim(b, "\r\n") }) if err != nil { diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index d4059962bd196..e8941b3bdfc70 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -10,22 +10,17 @@ import ( ) type customFactory struct { - Flusher tokenize.FlusherConfig - Splitter bufio.SplitFunc + flusherCfg tokenize.FlusherConfig + splitFunc bufio.SplitFunc } var _ Factory = (*customFactory)(nil) -func NewCustomFactory( - flusher tokenize.FlusherConfig, - splitter bufio.SplitFunc) Factory { - return &customFactory{ - Flusher: flusher, - Splitter: splitter, - } +func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFunc) Factory { + return &customFactory{flusherCfg: flusherCfg, splitFunc: splitFunc} } // Build builds Multiline Splitter struct -func (factory *customFactory) Build(_ int) (bufio.SplitFunc, error) { - return factory.Flusher.Build().SplitFunc(factory.Splitter), nil +func (f *customFactory) Build() (bufio.SplitFunc, error) { + return f.flusherCfg.Wrap(f.splitFunc), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go index 2d40533f1b509..6ccdeb22f220b 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go @@ -17,13 +17,9 @@ func TestCustomFactory(t *testing.T) { Flusher tokenize.FlusherConfig Splitter bufio.SplitFunc } - type args struct { - maxLogSize int - } tests := []struct { name string fields fields - args args wantErr bool }{ { @@ -34,16 +30,13 @@ func TestCustomFactory(t *testing.T) { return len(data), data, nil }, }, - args: args{ - maxLogSize: 1024, - }, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { factory := NewCustomFactory(tt.fields.Flusher, tt.fields.Splitter) - got, err := factory.Build(tt.args.maxLogSize) + got, err := factory.Build() if (err != nil) != tt.wantErr { t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/stanza/fileconsumer/internal/splitter/factory.go b/pkg/stanza/fileconsumer/internal/splitter/factory.go index a70c3a83a9615..85d8b0461969a 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/factory.go +++ b/pkg/stanza/fileconsumer/internal/splitter/factory.go @@ -8,5 +8,5 @@ import ( ) type Factory interface { - Build(maxLogSize int) (bufio.SplitFunc, error) + Build() (bufio.SplitFunc, error) } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go index 66b1d2c8a5aa3..690f2b64a9c17 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline.go @@ -6,32 +6,21 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bufio" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" ) type multilineFactory struct { - tokenize.SplitterConfig + splitterCfg tokenize.SplitterConfig + maxLogSize int } var _ Factory = (*multilineFactory)(nil) -func NewMultilineFactory(splitter tokenize.SplitterConfig) Factory { - return &multilineFactory{ - SplitterConfig: splitter, - } +func NewMultilineFactory(splitterCfg tokenize.SplitterConfig, maxLogSize int) Factory { + return &multilineFactory{splitterCfg: splitterCfg, maxLogSize: maxLogSize} } // Build builds Multiline Splitter struct -func (factory *multilineFactory) Build(maxLogSize int) (bufio.SplitFunc, error) { - enc, err := decoder.LookupEncoding(factory.Encoding) - if err != nil { - return nil, err - } - flusher := factory.Flusher.Build() - splitter, err := factory.Multiline.Build(enc, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize) - if err != nil { - return nil, err - } - return splitter, nil +func (f *multilineFactory) Build() (bufio.SplitFunc, error) { + return f.splitterCfg.Build(false, f.maxLogSize) } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index 99a8b8a2a44cb..07decb7d4fa8f 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -59,8 +59,8 @@ func TestMultilineBuild(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - factory := NewMultilineFactory(tt.splitterConfig) - got, err := factory.Build(tt.args.maxLogSize) + factory := NewMultilineFactory(tt.splitterConfig, tt.args.maxLogSize) + got, err := factory.Build() if (err != nil) != tt.wantErr { t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index 8e51a56942708..9d7b40a966455 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -29,7 +29,7 @@ type readerFactory struct { } func (f *readerFactory) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) { - lineSplitFunc, err := f.splitterFactory.Build(f.readerConfig.maxLogSize) + lineSplitFunc, err := f.splitterFactory.Build() if err != nil { return nil, err } @@ -44,7 +44,7 @@ func (f *readerFactory) copy(old *reader, newFile *os.File) (*reader, error) { var err error lineSplitFunc := old.lineSplitFunc if lineSplitFunc == nil { - lineSplitFunc, err = f.splitterFactory.Build(f.readerConfig.maxLogSize) + lineSplitFunc, err = f.splitterFactory.Build() if err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 4f2d3be9da948..9a9ead149aca6 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -180,7 +180,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { Encoding: "utf-8", Flusher: tokenize.NewFlusherConfig(), Multiline: mlc, - }) + }, 15) f.readerConfig.maxLogSize = 15 temp := openTemp(t, t.TempDir()) @@ -249,7 +249,7 @@ func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.Splitte emit: testEmitFunc(emitChan), }, fromBeginning: true, - splitterFactory: splitter.NewMultilineFactory(splitterConfig), + splitterFactory: splitter.NewMultilineFactory(splitterConfig, defaultMaxLogSize), encoding: enc, }, emitChan } diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index caf6da6baf0d3..e0222089ea651 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -83,7 +83,7 @@ type BaseConfig struct { type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error) func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { - splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize)) + splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, int(c.MaxLogSize)) if err != nil { return nil, err } diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index e92142dc54dd0..b8625c2577490 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -91,7 +91,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } // Build multiline - splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, MaxUDPSize) + splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, MaxUDPSize) if err != nil { return nil, err } diff --git a/pkg/stanza/tokenize/flusher.go b/pkg/stanza/tokenize/flusher.go index 922606179e5a7..dea09214d85c0 100644 --- a/pkg/stanza/tokenize/flusher.go +++ b/pkg/stanza/tokenize/flusher.go @@ -8,6 +8,8 @@ import ( "time" ) +const DefaultFlushPeriod = 500 * time.Millisecond + // FlusherConfig is a configuration of Flusher helper type FlusherConfig struct { Period time.Duration `mapstructure:"force_flush_period"` @@ -17,21 +19,22 @@ type FlusherConfig struct { func NewFlusherConfig() FlusherConfig { return FlusherConfig{ // Empty or `0s` means that we will never force flush - Period: time.Millisecond * 500, + Period: DefaultFlushPeriod, } } -// Build creates Flusher from configuration -func (c *FlusherConfig) Build() *Flusher { - return &Flusher{ +// Wrap a bufio.SplitFunc with a flusher +func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc) bufio.SplitFunc { + f := &flusher{ lastDataChange: time.Now(), forcePeriod: c.Period, previousDataLength: 0, } + return f.splitFunc(splitFunc) } -// Flusher keeps information about flush state -type Flusher struct { +// flusher keeps information about flush state +type flusher struct { // forcePeriod defines time from last flush which should pass before setting force to true. // Never forces if forcePeriod is set to 0 forcePeriod time.Duration @@ -45,7 +48,7 @@ type Flusher struct { previousDataLength int } -func (f *Flusher) UpdateDataChangeTime(length int) { +func (f *flusher) updateDataChangeTime(length int) { // Skip if length is greater than 0 and didn't changed if length > 0 && length == f.previousDataLength { return @@ -58,17 +61,17 @@ func (f *Flusher) UpdateDataChangeTime(length int) { } // Flushed reset data length -func (f *Flusher) Flushed() { - f.UpdateDataChangeTime(0) +func (f *flusher) flushed() { + f.updateDataChangeTime(0) } // ShouldFlush returns true if data should be forcefully flushed -func (f *Flusher) ShouldFlush() bool { +func (f *flusher) shouldFlush() bool { // Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0 return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0 } -func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { +func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = splitFunc(data, atEOF) @@ -80,21 +83,21 @@ func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc { // Return token if token != nil { // Inform flusher that we just flushed - f.Flushed() + f.flushed() return } // If there is no token, force flush eventually - if f.ShouldFlush() { + if f.shouldFlush() { // Inform flusher that we just flushed - f.Flushed() + f.flushed() token = trimWhitespacesFunc(data) advance = len(data) return } // Inform flusher that we didn't flushed - f.UpdateDataChangeTime(len(data)) + f.updateDataChangeTime(len(data)) return } } diff --git a/pkg/stanza/tokenize/multiline.go b/pkg/stanza/tokenize/multiline.go index 99682334e208e..8b9a7a20ac07b 100644 --- a/pkg/stanza/tokenize/multiline.go +++ b/pkg/stanza/tokenize/multiline.go @@ -15,7 +15,6 @@ import ( // Multiline consists of splitFunc and variables needed to perform force flush type Multiline struct { SplitFunc bufio.SplitFunc - Force *Flusher } // NewMultilineConfig creates a new Multiline config @@ -33,12 +32,12 @@ type MultilineConfig struct { } // Build will build a Multiline operator. -func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, force *Flusher, maxLogSize int) (bufio.SplitFunc, error) { - return c.getSplitFunc(enc, flushAtEOF, force, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces) +func (c MultilineConfig) Build(enc encoding.Encoding, flushAtEOF, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool, maxLogSize int) (bufio.SplitFunc, error) { + return c.getSplitFunc(enc, flushAtEOF, maxLogSize, preserveLeadingWhitespaces, preserveTrailingWhitespaces) } // getSplitFunc returns split function for bufio.Scanner basing on configured pattern -func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, force *Flusher, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) { +func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, preserveLeadingWhitespaces, preserveTrailingWhitespaces bool) (bufio.SplitFunc, error) { endPattern := c.LineEndPattern startPattern := c.LineStartPattern @@ -53,9 +52,9 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, fo case enc == encoding.Nop && (endPattern != "" || startPattern != ""): return nil, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding") case enc == encoding.Nop: - return SplitNone(maxLogSize), nil + return NoSplitFunc(maxLogSize), nil case endPattern == "" && startPattern == "": - splitFunc, err = NewNewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + splitFunc, err = NewlineSplitFunc(enc, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) if err != nil { return nil, err } @@ -64,27 +63,22 @@ func (c MultilineConfig) getSplitFunc(enc encoding.Encoding, flushAtEOF bool, fo if err != nil { return nil, fmt.Errorf("compile line end regex: %w", err) } - splitFunc = NewLineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + splitFunc = LineEndSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) case startPattern != "": re, err := regexp.Compile("(?m)" + c.LineStartPattern) if err != nil { return nil, fmt.Errorf("compile line start regex: %w", err) } - splitFunc = NewLineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) + splitFunc = LineStartSplitFunc(re, flushAtEOF, getTrimFunc(preserveLeadingWhitespaces, preserveTrailingWhitespaces)) default: return nil, fmt.Errorf("unreachable") } - - if force != nil { - return force.SplitFunc(splitFunc), nil - } - return splitFunc, nil } -// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into +// LineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that start with a match to the regex pattern provided -func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { +func LineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { firstLoc := re.FindIndex(data) if firstLoc == nil { @@ -136,9 +130,9 @@ func NewLineStartSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc } } -// NewLineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into +// LineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into // tokens that end with a match to the regex pattern provided -func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { +func LineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) bufio.SplitFunc { return func(data []byte, atEOF bool) (advance int, token []byte, err error) { loc := re.FindIndex(data) if loc == nil { @@ -164,9 +158,9 @@ func NewLineEndSplitFunc(re *regexp.Regexp, flushAtEOF bool, trimFunc trimFunc) } } -// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but +// NewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but // never returning an token using EOF as a terminator -func NewNewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) { +func NewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFunc) (bufio.SplitFunc, error) { newline, err := encodedNewline(enc) if err != nil { return nil, err @@ -201,6 +195,24 @@ func NewNewlineSplitFunc(enc encoding.Encoding, flushAtEOF bool, trimFunc trimFu }, nil } +// NoSplitFunc doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop +func NoSplitFunc(maxLogSize int) bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if len(data) >= maxLogSize { + return maxLogSize, data[:maxLogSize], nil + } + + if !atEOF { + return 0, nil, nil + } + + if len(data) == 0 { + return 0, nil, nil + } + return len(data), data, nil + } +} + func encodedNewline(enc encoding.Encoding) ([]byte, error) { out := make([]byte, 10) nDst, _, err := enc.NewEncoder().Transform(out, []byte{'\n'}, true) diff --git a/pkg/stanza/tokenize/multiline_test.go b/pkg/stanza/tokenize/multiline_test.go index 23482f25ba9bd..04ffe7b7ddb9b 100644 --- a/pkg/stanza/tokenize/multiline_test.go +++ b/pkg/stanza/tokenize/multiline_test.go @@ -27,7 +27,7 @@ const ( type MultiLineTokenizerTestCase struct { tokenizetest.TestCase - Flusher *Flusher + Flusher *FlusherConfig } func TestLineStartSplitFunc(t *testing.T) { @@ -165,9 +165,7 @@ func TestLineStartSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: sleepDuration, }, - &Flusher{ - forcePeriod: forcePeriod, - }, + &FlusherConfig{Period: forcePeriod}, }, { tokenizetest.TestCase{ @@ -181,9 +179,7 @@ func TestLineStartSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: sleepDuration, }, - &Flusher{ - forcePeriod: forcePeriod, - }, + &FlusherConfig{Period: forcePeriod}, }, { tokenizetest.TestCase{ @@ -196,9 +192,7 @@ func TestLineStartSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: forcePeriod / 4, }, - &Flusher{ - forcePeriod: forcePeriod * 16, - }, + &FlusherConfig{Period: 16 * forcePeriod}, }, { tokenizetest.TestCase{ @@ -212,9 +206,7 @@ func TestLineStartSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: sleepDuration, }, - &Flusher{ - forcePeriod: forcePeriod, - }, + &FlusherConfig{Period: forcePeriod}, }, } @@ -223,13 +215,16 @@ func TestLineStartSplitFunc(t *testing.T) { LineStartPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) require.NoError(t, err) + if tc.Flusher != nil { + splitFunc = tc.Flusher.Wrap(splitFunc) + } t.Run(tc.Name, tc.Run(splitFunc)) } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := NewLineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, noTrim) + splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, noTrim) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -365,7 +360,7 @@ func TestLineEndSplitFunc(t *testing.T) { Pattern: `^LOGEND.*$`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, - &Flusher{}, + nil, }, { tokenizetest.TestCase{ @@ -379,9 +374,7 @@ func TestLineEndSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: sleepDuration, }, - &Flusher{ - forcePeriod: forcePeriod, - }, + &FlusherConfig{Period: forcePeriod}, }, { tokenizetest.TestCase{ @@ -396,9 +389,7 @@ func TestLineEndSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: sleepDuration, }, - &Flusher{ - forcePeriod: forcePeriod, - }, + &FlusherConfig{Period: forcePeriod}, }, { tokenizetest.TestCase{ @@ -412,9 +403,7 @@ func TestLineEndSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: forcePeriod / 4, }, - &Flusher{ - forcePeriod: forcePeriod * 16, - }, + &FlusherConfig{Period: 16 * forcePeriod}, }, { tokenizetest.TestCase{ @@ -428,9 +417,7 @@ func TestLineEndSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: sleepDuration, }, - &Flusher{ - forcePeriod: forcePeriod, - }, + &FlusherConfig{Period: forcePeriod}, }, } @@ -439,8 +426,11 @@ func TestLineEndSplitFunc(t *testing.T) { LineEndPattern: tc.Pattern, } - splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, tc.Flusher, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) require.NoError(t, err) + if tc.Flusher != nil { + splitFunc = tc.Flusher.Wrap(splitFunc) + } t.Run(tc.Name, tc.Run(splitFunc)) } } @@ -531,8 +521,7 @@ func TestNewlineSplitFunc(t *testing.T) { tokenizetest.TestCase{Name: "LogsWithoutFlusher", Input: []byte("LOGPART log1"), }, - - &Flusher{}, + nil, }, { tokenizetest.TestCase{Name: "LogsWithFlusher", @@ -543,9 +532,7 @@ func TestNewlineSplitFunc(t *testing.T) { AdditionalIterations: 1, Sleep: sleepDuration, }, - &Flusher{ - forcePeriod: forcePeriod, - }, + &FlusherConfig{Period: forcePeriod}, }, { tokenizetest.TestCase{Name: "DefaultFlusherSplits", @@ -604,10 +591,10 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc, err := NewNewlineSplitFunc(unicode.UTF8, false, getTrimFunc(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)) + splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, getTrimFunc(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)) require.NoError(t, err) if tc.Flusher != nil { - splitFunc = tc.Flusher.SplitFunc(splitFunc) + splitFunc = tc.Flusher.Wrap(splitFunc) } t.Run(tc.Name, tc.Run(splitFunc)) } @@ -669,7 +656,7 @@ func TestNoSplitFunc(t *testing.T) { } for _, tc := range testCases { - splitFunc := SplitNone(largeLogSize) + splitFunc := NoSplitFunc(largeLogSize) t.Run(tc.Name, tc.Run(splitFunc)) } } @@ -679,14 +666,14 @@ func TestNoopEncodingError(t *testing.T) { LineEndPattern: "\n", } - _, err := cfg.getSplitFunc(encoding.Nop, false, nil, 0, false, false) + _, err := cfg.getSplitFunc(encoding.Nop, false, 0, false, false) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) cfg = &MultilineConfig{ LineStartPattern: "\n", } - _, err = cfg.getSplitFunc(encoding.Nop, false, nil, 0, false, false) + _, err = cfg.getSplitFunc(encoding.Nop, false, 0, false, false) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) } @@ -747,7 +734,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewNewlineSplitFunc(tc.encoding, false, noTrim) + splitFunc, err := NewlineSplitFunc(tc.encoding, false, noTrim) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) diff --git a/pkg/stanza/tokenize/splitter.go b/pkg/stanza/tokenize/splitter.go index 8f58402e53897..2a7b66090ecfe 100644 --- a/pkg/stanza/tokenize/splitter.go +++ b/pkg/stanza/tokenize/splitter.go @@ -23,51 +23,21 @@ func NewSplitterConfig() SplitterConfig { return SplitterConfig{ Encoding: "utf-8", Multiline: NewMultilineConfig(), - Flusher: NewFlusherConfig(), + Flusher: FlusherConfig{Period: DefaultFlushPeriod}, } } -// Build builds Splitter struct -func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (*Splitter, error) { +// Build builds bufio.SplitFunc based on the config +func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (bufio.SplitFunc, error) { enc, err := decoder.LookupEncoding(c.Encoding) if err != nil { return nil, err } - flusher := c.Flusher.Build() - splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, flusher, maxLogSize) + splitFunc, err := c.Multiline.Build(enc, flushAtEOF, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, maxLogSize) if err != nil { return nil, err } - return &Splitter{ - Decoder: decoder.New(enc), - Flusher: flusher, - SplitFunc: splitFunc, - }, nil -} - -// Splitter consolidates Flusher and dependent splitFunc -type Splitter struct { - Decoder *decoder.Decoder - SplitFunc bufio.SplitFunc - Flusher *Flusher -} - -// SplitNone doesn't split any of the bytes, it reads in all of the bytes and returns it all at once. This is for when the encoding is nop -func SplitNone(maxLogSize int) bufio.SplitFunc { - return func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if len(data) >= maxLogSize { - return maxLogSize, data[:maxLogSize], nil - } - - if !atEOF { - return 0, nil, nil - } - - if len(data) == 0 { - return 0, nil, nil - } - return len(data), data, nil - } + return c.Flusher.Wrap(splitFunc), nil }