Skip to content

Commit

Permalink
[chore][pkg/stanza] Cleanup recombine operator files (open-telemetry#…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and rimitchell committed May 8, 2024
1 parent 6260bdf commit 75389d3
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 352 deletions.
130 changes: 130 additions & 0 deletions pkg/stanza/operator/transformer/recombine/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package recombine // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine"

import (
"bytes"
"fmt"
"sync"
"time"

"github.com/expr-lang/expr/vm"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

const (
operatorType = "recombine"
defaultCombineWith = "\n"
)

func init() {
operator.Register(operatorType, func() operator.Builder { return NewConfig() })
}

// NewConfig creates a new recombine config with default values
func NewConfig() *Config {
return NewConfigWithID(operatorType)
}

// NewConfigWithID creates a new recombine config with default values
func NewConfigWithID(operatorID string) *Config {
return &Config{
TransformerConfig: helper.NewTransformerConfig(operatorID, operatorType),
MaxBatchSize: 1000,
MaxSources: 1000,
CombineWith: defaultCombineWith,
OverwriteWith: "newest",
ForceFlushTimeout: 5 * time.Second,
SourceIdentifier: entry.NewAttributeField("file.path"),
}
}

// Config is the configuration of a recombine operator
type Config struct {
helper.TransformerConfig `mapstructure:",squash"`
IsFirstEntry string `mapstructure:"is_first_entry"`
IsLastEntry string `mapstructure:"is_last_entry"`
MaxBatchSize int `mapstructure:"max_batch_size"`
CombineField entry.Field `mapstructure:"combine_field"`
CombineWith string `mapstructure:"combine_with"`
SourceIdentifier entry.Field `mapstructure:"source_identifier"`
OverwriteWith string `mapstructure:"overwrite_with"`
ForceFlushTimeout time.Duration `mapstructure:"force_flush_period"`
MaxSources int `mapstructure:"max_sources"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
}

// Build creates a new Transformer from a config
func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
transformer, err := c.TransformerConfig.Build(logger)
if err != nil {
return nil, fmt.Errorf("failed to build transformer config: %w", err)
}

if c.IsLastEntry != "" && c.IsFirstEntry != "" {
return nil, fmt.Errorf("only one of is_first_entry and is_last_entry can be set")
}

if c.IsLastEntry == "" && c.IsFirstEntry == "" {
return nil, fmt.Errorf("one of is_first_entry and is_last_entry must be set")
}

var matchesFirst bool
var prog *vm.Program
if c.IsFirstEntry != "" {
matchesFirst = true
prog, err = helper.ExprCompileBool(c.IsFirstEntry)
if err != nil {
return nil, fmt.Errorf("failed to compile is_first_entry: %w", err)
}
} else {
matchesFirst = false
prog, err = helper.ExprCompileBool(c.IsLastEntry)
if err != nil {
return nil, fmt.Errorf("failed to compile is_last_entry: %w", err)
}
}

if c.CombineField.FieldInterface == nil {
return nil, fmt.Errorf("missing required argument 'combine_field'")
}

var overwriteWithOldest bool
switch c.OverwriteWith {
case "newest":
overwriteWithOldest = false
case "oldest", "":
overwriteWithOldest = true
default:
return nil, fmt.Errorf("invalid value '%s' for parameter 'overwrite_with'", c.OverwriteWith)
}

return &Transformer{
TransformerOperator: transformer,
matchFirstLine: matchesFirst,
prog: prog,
maxBatchSize: c.MaxBatchSize,
maxSources: c.MaxSources,
overwriteWithOldest: overwriteWithOldest,
batchMap: make(map[string]*sourceBatch),
batchPool: sync.Pool{
New: func() any {
return &sourceBatch{
recombined: &bytes.Buffer{},
}
},
},
combineField: c.CombineField,
combineWith: c.CombineWith,
forceFlushTimeout: c.ForceFlushTimeout,
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
sourceIdentifier: c.SourceIdentifier,
maxLogSize: int64(c.MaxLogSize),
}, nil
}
Loading

0 comments on commit 75389d3

Please sign in to comment.