Skip to content

Commit

Permalink
[chore][pkg/stanza] Simplify tokenize package (open-telemetry#26040)
Browse files Browse the repository at this point in the history
This PR contains several simplifications within and relating to the new
tokenization package:
- `tokenize.Multiline` now returns a `bufio.splitFunc` without concern
for a flush func. A flush func is applied later if necessary.
- Remove `tokenize.Splitter`. `SplitterConfig` can now generate a
`bufio.SplitFunc` directly.
- Unexport `tokenize.Flusher`. `FlusherConfig` can now wrap a
`bufio.SplitFunc` directly. (It does still create a struct to maintain
state, but then wraps the split func with a receiver that interacts with
this state.)
- `fileconsumer`'s internal `splitter` package was also simplified. It
may make sense to merge this into `tokenize` in a future PR, but this PR
is large enough.
  • Loading branch information
djaglowski committed Aug 30, 2023
1 parent 5645f1e commit f6f1a24
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 158 deletions.
6 changes: 3 additions & 3 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager,
}

// Ensure that splitter is buildable
factory := splitter.NewMultilineFactory(c.Splitter)
if _, err := factory.Build(int(c.MaxLogSize)); err != nil {
factory := splitter.NewMultilineFactory(c.Splitter, int(c.MaxLogSize))
if _, err := factory.Build(); err != nil {
return nil, err
}

Expand All @@ -109,7 +109,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback

// Ensure that splitter is buildable
factory := splitter.NewCustomFactory(c.Splitter.Flusher, splitFunc)
if _, err := factory.Build(int(c.MaxLogSize)); err != nil {
if _, err := factory.Build(); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/header/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewConfig(matchRegex string, metadataOperators []operator.Config, enc encod
return nil, fmt.Errorf("failed to compile `pattern`: %w", err)
}

splitFunc, err := tokenize.NewNewlineSplitFunc(enc, false, func(b []byte) []byte {
splitFunc, err := tokenize.NewlineSplitFunc(enc, false, func(b []byte) []byte {
return bytes.Trim(b, "\r\n")
})
if err != nil {
Expand Down
17 changes: 6 additions & 11 deletions pkg/stanza/fileconsumer/internal/splitter/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,17 @@ import (
)

type customFactory struct {
Flusher tokenize.FlusherConfig
Splitter bufio.SplitFunc
flusherCfg tokenize.FlusherConfig
splitFunc bufio.SplitFunc
}

var _ Factory = (*customFactory)(nil)

func NewCustomFactory(
flusher tokenize.FlusherConfig,
splitter bufio.SplitFunc) Factory {
return &customFactory{
Flusher: flusher,
Splitter: splitter,
}
func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFunc) Factory {
return &customFactory{flusherCfg: flusherCfg, splitFunc: splitFunc}
}

// Build builds Multiline Splitter struct
func (factory *customFactory) Build(_ int) (bufio.SplitFunc, error) {
return factory.Flusher.Build().SplitFunc(factory.Splitter), nil
func (f *customFactory) Build() (bufio.SplitFunc, error) {
return f.flusherCfg.Wrap(f.splitFunc), nil
}
9 changes: 1 addition & 8 deletions pkg/stanza/fileconsumer/internal/splitter/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ func TestCustomFactory(t *testing.T) {
Flusher tokenize.FlusherConfig
Splitter bufio.SplitFunc
}
type args struct {
maxLogSize int
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
Expand All @@ -34,16 +30,13 @@ func TestCustomFactory(t *testing.T) {
return len(data), data, nil
},
},
args: args{
maxLogSize: 1024,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewCustomFactory(tt.fields.Flusher, tt.fields.Splitter)
got, err := factory.Build(tt.args.maxLogSize)
got, err := factory.Build()
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/splitter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
)

type Factory interface {
Build(maxLogSize int) (bufio.SplitFunc, error)
Build() (bufio.SplitFunc, error)
}
23 changes: 6 additions & 17 deletions pkg/stanza/fileconsumer/internal/splitter/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,21 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"bufio"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
)

type multilineFactory struct {
tokenize.SplitterConfig
splitterCfg tokenize.SplitterConfig
maxLogSize int
}

var _ Factory = (*multilineFactory)(nil)

func NewMultilineFactory(splitter tokenize.SplitterConfig) Factory {
return &multilineFactory{
SplitterConfig: splitter,
}
func NewMultilineFactory(splitterCfg tokenize.SplitterConfig, maxLogSize int) Factory {
return &multilineFactory{splitterCfg: splitterCfg, maxLogSize: maxLogSize}
}

// Build builds Multiline Splitter struct
func (factory *multilineFactory) Build(maxLogSize int) (bufio.SplitFunc, error) {
enc, err := decoder.LookupEncoding(factory.Encoding)
if err != nil {
return nil, err
}
flusher := factory.Flusher.Build()
splitter, err := factory.Multiline.Build(enc, false, factory.PreserveLeadingWhitespaces, factory.PreserveTrailingWhitespaces, flusher, maxLogSize)
if err != nil {
return nil, err
}
return splitter, nil
func (f *multilineFactory) Build() (bufio.SplitFunc, error) {
return f.splitterCfg.Build(false, f.maxLogSize)
}
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/internal/splitter/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func TestMultilineBuild(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewMultilineFactory(tt.splitterConfig)
got, err := factory.Build(tt.args.maxLogSize)
factory := NewMultilineFactory(tt.splitterConfig, tt.args.maxLogSize)
got, err := factory.Build()
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/reader_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type readerFactory struct {
}

func (f *readerFactory) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) {
lineSplitFunc, err := f.splitterFactory.Build(f.readerConfig.maxLogSize)
lineSplitFunc, err := f.splitterFactory.Build()
if err != nil {
return nil, err
}
Expand All @@ -44,7 +44,7 @@ func (f *readerFactory) copy(old *reader, newFile *os.File) (*reader, error) {
var err error
lineSplitFunc := old.lineSplitFunc
if lineSplitFunc == nil {
lineSplitFunc, err = f.splitterFactory.Build(f.readerConfig.maxLogSize)
lineSplitFunc, err = f.splitterFactory.Build()
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) {
Encoding: "utf-8",
Flusher: tokenize.NewFlusherConfig(),
Multiline: mlc,
})
}, 15)
f.readerConfig.maxLogSize = 15

temp := openTemp(t, t.TempDir())
Expand Down Expand Up @@ -249,7 +249,7 @@ func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.Splitte
emit: testEmitFunc(emitChan),
},
fromBeginning: true,
splitterFactory: splitter.NewMultilineFactory(splitterConfig),
splitterFactory: splitter.NewMultilineFactory(splitterConfig, defaultMaxLogSize),
encoding: enc,
}, emitChan
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type BaseConfig struct {
type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, int(c.MaxLogSize))
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, int(c.MaxLogSize))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

// Build multiline
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, nil, MaxUDPSize)
splitFunc, err := c.Multiline.Build(enc, true, c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces, MaxUDPSize)
if err != nil {
return nil, err
}
Expand Down
33 changes: 18 additions & 15 deletions pkg/stanza/tokenize/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"
)

const DefaultFlushPeriod = 500 * time.Millisecond

// FlusherConfig is a configuration of Flusher helper
type FlusherConfig struct {
Period time.Duration `mapstructure:"force_flush_period"`
Expand All @@ -17,21 +19,22 @@ type FlusherConfig struct {
func NewFlusherConfig() FlusherConfig {
return FlusherConfig{
// Empty or `0s` means that we will never force flush
Period: time.Millisecond * 500,
Period: DefaultFlushPeriod,
}
}

// Build creates Flusher from configuration
func (c *FlusherConfig) Build() *Flusher {
return &Flusher{
// Wrap a bufio.SplitFunc with a flusher
func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc) bufio.SplitFunc {
f := &flusher{
lastDataChange: time.Now(),
forcePeriod: c.Period,
previousDataLength: 0,
}
return f.splitFunc(splitFunc)
}

// Flusher keeps information about flush state
type Flusher struct {
// flusher keeps information about flush state
type flusher struct {
// forcePeriod defines time from last flush which should pass before setting force to true.
// Never forces if forcePeriod is set to 0
forcePeriod time.Duration
Expand All @@ -45,7 +48,7 @@ type Flusher struct {
previousDataLength int
}

func (f *Flusher) UpdateDataChangeTime(length int) {
func (f *flusher) updateDataChangeTime(length int) {
// Skip if length is greater than 0 and didn't changed
if length > 0 && length == f.previousDataLength {
return
Expand All @@ -58,17 +61,17 @@ func (f *Flusher) UpdateDataChangeTime(length int) {
}

// Flushed reset data length
func (f *Flusher) Flushed() {
f.UpdateDataChangeTime(0)
func (f *flusher) flushed() {
f.updateDataChangeTime(0)
}

// ShouldFlush returns true if data should be forcefully flushed
func (f *Flusher) ShouldFlush() bool {
func (f *flusher) shouldFlush() bool {
// Returns true if there is f.forcePeriod after f.lastDataChange and data length is greater than 0
return f.forcePeriod > 0 && time.Since(f.lastDataChange) > f.forcePeriod && f.previousDataLength > 0
}

func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
func (f *flusher) splitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = splitFunc(data, atEOF)

Expand All @@ -80,21 +83,21 @@ func (f *Flusher) SplitFunc(splitFunc bufio.SplitFunc) bufio.SplitFunc {
// Return token
if token != nil {
// Inform flusher that we just flushed
f.Flushed()
f.flushed()
return
}

// If there is no token, force flush eventually
if f.ShouldFlush() {
if f.shouldFlush() {
// Inform flusher that we just flushed
f.Flushed()
f.flushed()
token = trimWhitespacesFunc(data)
advance = len(data)
return
}

// Inform flusher that we didn't flushed
f.UpdateDataChangeTime(len(data))
f.updateDataChangeTime(len(data))
return
}
}
Loading

0 comments on commit f6f1a24

Please sign in to comment.