Skip to content

Commit

Permalink
[chore][fileconsumer] Consolidate reader factory's header code (#24020)
Browse files Browse the repository at this point in the history
This consolidates common conditions into a simpler structure.
  • Loading branch information
djaglowski committed Jul 7, 2023
1 parent d3f5cd4 commit c1a4094
Showing 1 changed file with 28 additions and 37 deletions.
65 changes: 28 additions & 37 deletions pkg/stanza/fileconsumer/reader_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type readerBuilder struct {
}

func (f *readerFactory) newReaderBuilder() *readerBuilder {
return &readerBuilder{readerFactory: f}
return &readerBuilder{readerFactory: f, headerAttributes: map[string]any{}}
}

func (b *readerBuilder) withSplitterFunc(s bufio.SplitFunc) *readerBuilder {
Expand Down Expand Up @@ -113,13 +113,6 @@ func (b *readerBuilder) build() (r *Reader, err error) {
}
}

if b.headerSettings != nil && !b.headerFinalized {
// If we are reading the header, we should start with the header split func
r.splitFunc = b.headerSettings.splitFunc
} else {
r.splitFunc = r.lineSplitFunc
}

enc, err := b.encodingConfig.Build()
if err != nil {
return
Expand All @@ -136,7 +129,7 @@ func (b *readerBuilder) build() (r *Reader, err error) {

// unsafeReader has the file set to nil, so don't try emending its offset.
if !b.fromBeginning {
if err := r.offsetToEnd(); err != nil {
if err = r.offsetToEnd(); err != nil {
return nil, err
}
}
Expand All @@ -145,47 +138,45 @@ func (b *readerBuilder) build() (r *Reader, err error) {
r.FileAttributes = &FileAttributes{}
}

if b.headerAttributes != nil {
r.FileAttributes.HeaderAttributes = b.headerAttributes
} else {
r.FileAttributes.HeaderAttributes = map[string]any{}
}

if b.fp != nil {
r.Fingerprint = b.fp
} else if b.file != nil {
fp, err := b.readerFactory.newFingerprint(r.file)
r.Fingerprint, err = b.readerFactory.newFingerprint(r.file)
if err != nil {
return nil, err
}
r.Fingerprint = fp
}

// Create the header pipeline if we need it
// (if we are doing header parsing (headerSettings != nil), and if the header is not yet finalized)
if b.headerSettings != nil && !b.headerFinalized {
outOp := newHeaderPipelineOutput(b.SugaredLogger)
p, err := pipeline.Config{
Operators: b.headerSettings.config.MetadataOperators,
DefaultOutput: outOp,
}.Build(b.SugaredLogger)
r.FileAttributes.HeaderAttributes = b.headerAttributes

if err != nil {
return nil, fmt.Errorf("failed to build pipeline: %w", err)
}
if b.headerSettings == nil || b.headerFinalized {
r.splitFunc = r.lineSplitFunc
r.processFunc = b.readerConfig.emit
return r, nil
}

if err := p.Start(storage.NewNopClient()); err != nil {
return nil, fmt.Errorf("failed to start header pipeline: %w", err)
}
// We are reading the header so we should start with the header split func
r.splitFunc = b.headerSettings.splitFunc

r.headerPipeline = p
r.headerPipelineOutput = outOp
outOp := newHeaderPipelineOutput(b.SugaredLogger)
p, err := pipeline.Config{
Operators: b.headerSettings.config.MetadataOperators,
DefaultOutput: outOp,
}.Build(b.SugaredLogger)

// Set initial emit func to header function
r.processFunc = r.consumeHeaderLine
} else {
r.processFunc = b.readerConfig.emit
if err != nil {
return nil, fmt.Errorf("failed to build pipeline: %w", err)
}

if err := p.Start(storage.NewNopClient()); err != nil {
return nil, fmt.Errorf("failed to start header pipeline: %w", err)
}

r.headerPipeline = p
r.headerPipelineOutput = outOp

// Set initial emit func to header function
r.processFunc = r.consumeHeaderLine

return r, nil
}

0 comments on commit c1a4094

Please sign in to comment.