Skip to content

Commit

Permalink
[pkg/stanza] Remove converter options (open-telemetry#15696)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and shalper2 committed Dec 6, 2022
1 parent 3bc0e5d commit 9bc88ee
Show file tree
Hide file tree
Showing 26 changed files with 85 additions and 316 deletions.
19 changes: 19 additions & 0 deletions .chloggen/filelog-rm-converter-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza, filelog, journald, syslog, tcplog, udplog, windowseventlog, logstransform

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove ability to configure `converter`.

# One or more tracking issues related to the change
issues: [15696]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The design of the converter is opaque and its behavior may change in the future.
Because of this, the `converter` settings are deemed unstable. The actual behavior of the
converter remains unchanged, but will always use the former default values.
18 changes: 0 additions & 18 deletions pkg/stanza/adapter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"

import (
"time"

"go.opentelemetry.io/collector/config"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand All @@ -26,21 +24,5 @@ import (
type BaseConfig struct {
config.ReceiverSettings `mapstructure:",squash"`
Operators []operator.Config `mapstructure:"operators"`
Converter ConverterConfig `mapstructure:"converter"`
StorageID *config.ComponentID `mapstructure:"storage"`
}

// ConverterConfig controls how the internal entry.Entry to plog.Logs converter
// works.
type ConverterConfig struct {
// MaxFlushCount defines the maximum number of entries that can be
// accumulated before flushing them for further processing.
MaxFlushCount uint `mapstructure:"max_flush_count"`
// FlushInterval defines how often to flush the converted and accumulated
// log entries.
FlushInterval time.Duration `mapstructure:"flush_interval"`
// WorkerCount defines how many worker goroutines used for entry.Entry to
// log records translation should be spawned.
// By default: math.Max(1, runtime.NumCPU()/4) workers are spawned.
WorkerCount int `mapstructure:"worker_count"`
}
34 changes: 3 additions & 31 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,44 +98,16 @@ type Converter struct {
logger *zap.Logger
}

type ConverterOption interface {
apply(*Converter)
}

type optionFunc func(*Converter)

func (f optionFunc) apply(c *Converter) {
f(c)
}

func WithLogger(logger *zap.Logger) ConverterOption {
return optionFunc(func(c *Converter) {
c.logger = logger
})
}

func WithWorkerCount(workerCount int) ConverterOption {
return optionFunc(func(c *Converter) {
c.workerCount = workerCount
})
}

func NewConverter(opts ...ConverterOption) *Converter {
c := &Converter{
func NewConverter(logger *zap.Logger) *Converter {
return &Converter{
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
aggregationChan: make(chan []workerItem),
pLogsChan: make(chan plog.Logs),
stopChan: make(chan struct{}),
logger: zap.NewNop(),
flushChan: make(chan plog.Logs),
logger: logger,
}

for _, opt := range opts {
opt.apply(c)
}

return c
}

func (c *Converter) Start() {
Expand Down
11 changes: 4 additions & 7 deletions pkg/stanza/adapter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
)
Expand Down Expand Up @@ -434,9 +435,7 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Parallel()

converter := NewConverter(
WithWorkerCount(1),
)
converter := NewConverter(zap.NewNop())
converter.Start()
defer converter.Stop()

Expand Down Expand Up @@ -498,7 +497,7 @@ func TestAllConvertedEntriesAreSentAndReceived(t *testing.T) {
}

func TestConverterCancelledContextCancellsTheFlush(t *testing.T) {
converter := NewConverter()
converter := NewConverter(zap.NewNop())
converter.Start()
defer converter.Stop()
var wg sync.WaitGroup
Expand Down Expand Up @@ -890,9 +889,7 @@ func BenchmarkConverter(b *testing.B) {
b.Run(fmt.Sprintf("worker_count=%d", wc), func(b *testing.B) {
for i := 0; i < b.N; i++ {

converter := NewConverter(
WithWorkerCount(wc),
)
converter := NewConverter(zap.NewNop())
converter.Start()
defer converter.Stop()
b.ResetTimer()
Expand Down
36 changes: 3 additions & 33 deletions pkg/stanza/adapter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,43 +39,19 @@ type LogEmitter struct {
flushInterval time.Duration
}

type LogEmitterOption func(*LogEmitter)

var (
defaultFlushInterval = 100 * time.Millisecond
defaultMaxBatchSize uint = 100
)

// LogEmitterWithMaxBatchSize returns an option that makes the LogEmitter use the specified max batch size
func LogEmitterWithMaxBatchSize(maxBatchSize uint) LogEmitterOption {
return LogEmitterOption(func(le *LogEmitter) {
le.maxBatchSize = maxBatchSize
le.batch = make([]*entry.Entry, 0, maxBatchSize)
})
}

// LogEmitterWithFlushInterval returns an option that makes the LogEmitter use the specified flush interval
func LogEmitterWithFlushInterval(flushInterval time.Duration) LogEmitterOption {
return LogEmitterOption(func(le *LogEmitter) {
le.flushInterval = flushInterval
})
}

// LogEmitterWithLogger returns an option that makes the LogEmitter use the specified logger
func LogEmitterWithLogger(logger *zap.SugaredLogger) LogEmitterOption {
return LogEmitterOption(func(le *LogEmitter) {
le.OutputOperator.BasicOperator.SugaredLogger = logger
})
}

// NewLogEmitter creates a new receiver output
func NewLogEmitter(opts ...LogEmitterOption) *LogEmitter {
le := &LogEmitter{
func NewLogEmitter(logger *zap.SugaredLogger) *LogEmitter {
return &LogEmitter{
OutputOperator: helper.OutputOperator{
BasicOperator: helper.BasicOperator{
OperatorID: "log_emitter",
OperatorType: "log_emitter",
SugaredLogger: zap.NewNop().Sugar(),
SugaredLogger: logger,
},
},
logChan: make(chan []*entry.Entry),
Expand All @@ -84,12 +60,6 @@ func NewLogEmitter(opts ...LogEmitterOption) *LogEmitter {
flushInterval: defaultFlushInterval,
cancel: func() {},
}

for _, opt := range opts {
opt(le)
}

return le
}

// Start starts the goroutine(s) required for this operator
Expand Down
60 changes: 3 additions & 57 deletions pkg/stanza/adapter/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
)

func TestLogEmitter(t *testing.T) {
emitter := NewLogEmitter(
LogEmitterWithLogger(zaptest.NewLogger(t).Sugar()),
)
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar())

require.NoError(t, emitter.Start(nil))

Expand All @@ -50,60 +48,12 @@ func TestLogEmitter(t *testing.T) {
}
}

func TestLogEmitterRespectsMaxBatchSize(t *testing.T) {
const (
numEntries = 1111
maxBatchSize = 100
timeout = time.Second
)
emitter := NewLogEmitter(
LogEmitterWithLogger(zaptest.NewLogger(t).Sugar()),
LogEmitterWithMaxBatchSize(maxBatchSize),
LogEmitterWithFlushInterval(100*time.Millisecond),
)

require.NoError(t, emitter.Start(nil))
defer func() {
require.NoError(t, emitter.Stop())
}()

entries := complexEntries(numEntries)

go func() {
ctx := context.Background()
for _, e := range entries {
require.NoError(t, emitter.Process(ctx, e))
}
}()

entriesReceived := 0
timeoutChan := time.After(timeout)

for entriesReceived < numEntries {
select {
case recv := <-emitter.logChan:
entriesReceived += len(recv)
if len(recv) > maxBatchSize {
require.FailNow(t, "Expected only %d entries per batch, but got %d", maxBatchSize, entriesReceived)
}
case <-timeoutChan:
require.FailNow(t, "Failed to receive all log entries before timeout")
}
}

require.Equal(t, numEntries, entriesReceived)
}

func TestLogEmitterEmitsOnMaxBatchSize(t *testing.T) {
const (
maxBatchSize = 100
timeout = time.Second
)
emitter := NewLogEmitter(
LogEmitterWithLogger(zaptest.NewLogger(t).Sugar()),
LogEmitterWithMaxBatchSize(maxBatchSize),
LogEmitterWithFlushInterval(time.Hour),
)
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar())

require.NoError(t, emitter.Start(nil))
defer func() {
Expand Down Expand Up @@ -134,11 +84,7 @@ func TestLogEmitterEmitsOnFlushInterval(t *testing.T) {
flushInterval = 100 * time.Millisecond
timeout = time.Second
)
emitter := NewLogEmitter(
LogEmitterWithLogger(zaptest.NewLogger(t).Sugar()),
LogEmitterWithMaxBatchSize(100),
LogEmitterWithFlushInterval(flushInterval),
)
emitter := NewLogEmitter(zaptest.NewLogger(t).Sugar())

require.NoError(t, emitter.Start(nil))
defer func() {
Expand Down
23 changes: 2 additions & 21 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) component.CreateLogsRec

operators := append([]operator.Config{inputCfg}, baseCfg.Operators...)

emitterOpts := []LogEmitterOption{
LogEmitterWithLogger(params.Logger.Sugar()),
}

if baseCfg.Converter.MaxFlushCount > 0 {
emitterOpts = append(emitterOpts, LogEmitterWithMaxBatchSize(baseCfg.Converter.MaxFlushCount))
}

if baseCfg.Converter.FlushInterval > 0 {
emitterOpts = append(emitterOpts, LogEmitterWithFlushInterval(baseCfg.Converter.FlushInterval))
}

emitter := NewLogEmitter(emitterOpts...)
emitter := NewLogEmitter(params.Logger.Sugar())
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand All @@ -76,14 +64,7 @@ func createLogsReceiver(logReceiverType LogReceiverType) component.CreateLogsRec
return nil, err
}

opts := []ConverterOption{
WithLogger(params.Logger),
}

if baseCfg.Converter.WorkerCount > 0 {
opts = append(opts, WithWorkerCount(baseCfg.Converter.WorkerCount))
}
converter := NewConverter(opts...)
converter := NewConverter(params.Logger)
obsrecv := obsreport.MustNewReceiver(obsreport.ReceiverSettings{
ReceiverID: cfg.ID(),
ReceiverCreateSettings: params,
Expand Down
13 changes: 0 additions & 13 deletions pkg/stanza/adapter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package adapter
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand All @@ -43,18 +42,6 @@ func TestCreateReceiver(t *testing.T) {
require.NotNil(t, receiver, "receiver creation failed")
})

t.Run("Success with ConverterConfig", func(t *testing.T) {
factory := NewFactory(TestReceiverType{}, component.StabilityLevelInDevelopment)
cfg := factory.CreateDefaultConfig().(*TestConfig)
cfg.Converter = ConverterConfig{
MaxFlushCount: 1,
FlushInterval: 3 * time.Second,
}
receiver, err := factory.CreateLogsReceiver(context.Background(), componenttest.NewNopReceiverCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err, "receiver creation failed")
require.NotNil(t, receiver, "receiver creation failed")
})

t.Run("DecodeOperatorConfigsFailureMissingFields", func(t *testing.T) {
factory := NewFactory(TestReceiverType{}, component.StabilityLevelInDevelopment)
badCfg := factory.CreateDefaultConfig().(*TestConfig)
Expand Down
Loading

0 comments on commit 9bc88ee

Please sign in to comment.