From 5c2184fe1fd41423afe154a5e04f35800a525f8a Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Tue, 27 Dec 2022 10:32:06 -0500 Subject: [PATCH] [processor/logstransform] refactor to pass along logs asynchronously (#17079) * Make the logs transform processor more reliable by making it asynchronous --- ...ogs-transform-processor-more-reliable.yaml | 16 ++++ processor/logstransformprocessor/factory.go | 15 +--- processor/logstransformprocessor/processor.go | 89 +++++++++---------- .../logstransformprocessor/processor_test.go | 9 +- 4 files changed, 59 insertions(+), 70 deletions(-) create mode 100644 .chloggen/make-logs-transform-processor-more-reliable.yaml diff --git a/.chloggen/make-logs-transform-processor-more-reliable.yaml b/.chloggen/make-logs-transform-processor-more-reliable.yaml new file mode 100644 index 0000000000000..d6c19cd3f6218 --- /dev/null +++ b/.chloggen/make-logs-transform-processor-more-reliable.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: logstransformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Lets the logs transform processor directly pass messags to next consumer, avoiding the timing issues it previously exhibited. + +# One or more tracking issues related to the change +issues: [16604, 15378, 9761] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/logstransformprocessor/factory.go b/processor/logstransformprocessor/factory.go index 2bfe3651c58dd..6046dd77386b5 100644 --- a/processor/logstransformprocessor/factory.go +++ b/processor/logstransformprocessor/factory.go @@ -21,7 +21,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processorhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -67,17 +66,5 @@ func createLogsProcessor( return nil, errors.New("no operators were configured for this logs transform processor") } - proc := &logsTransformProcessor{ - logger: set.Logger, - config: pCfg, - } - return processorhelper.NewLogsProcessor( - ctx, - set, - cfg, - nextConsumer, - proc.processLogs, - processorhelper.WithStart(proc.Start), - processorhelper.WithShutdown(proc.Shutdown), - processorhelper.WithCapabilities(processorCapabilities)) + return newProcessor(pCfg, nextConsumer, set.Logger) } diff --git a/processor/logstransformprocessor/processor.go b/processor/logstransformprocessor/processor.go index 2db83b8271ce3..fd0117fce0604 100644 --- a/processor/logstransformprocessor/processor.go +++ b/processor/logstransformprocessor/processor.go @@ -17,12 +17,12 @@ package logstransformprocessor // import "github.com/open-telemetry/opentelemetr import ( "context" "errors" - "fmt" "math" "runtime" "sync" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" @@ -32,22 +32,45 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" ) -type outputType struct { - logs plog.Logs - err error -} - type logsTransformProcessor struct { logger *zap.Logger config *Config + consumer consumer.Logs + pipe *pipeline.DirectedPipeline firstOperator operator.Operator emitter *adapter.LogEmitter converter *adapter.Converter fromConverter *adapter.FromPdataConverter wg sync.WaitGroup - outputChannel chan outputType +} + +func newProcessor(config *Config, nextConsumer consumer.Logs, logger *zap.Logger) (*logsTransformProcessor, error) { + p := &logsTransformProcessor{ + logger: logger, + config: config, + consumer: nextConsumer, + } + + baseCfg := p.config.BaseConfig + + p.emitter = adapter.NewLogEmitter(p.logger.Sugar()) + pipe, err := pipeline.Config{ + Operators: baseCfg.Operators, + DefaultOutput: p.emitter, + }.Build(p.logger.Sugar()) + if err != nil { + return nil, err + } + + p.pipe = pipe + + return p, nil +} + +func (ltp *logsTransformProcessor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} } func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error { @@ -61,25 +84,14 @@ func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error { } func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Host) error { - baseCfg := ltp.config.BaseConfig - - ltp.emitter = adapter.NewLogEmitter(ltp.logger.Sugar()) - pipe, err := pipeline.Config{ - Operators: baseCfg.Operators, - DefaultOutput: ltp.emitter, - }.Build(ltp.logger.Sugar()) - if err != nil { - return err - } // There is no need for this processor to use storage - err = pipe.Start(storage.NewNopClient()) + err := ltp.pipe.Start(storage.NewNopClient()) if err != nil { return err } - ltp.pipe = pipe - pipelineOperators := pipe.Operators() + pipelineOperators := ltp.pipe.Operators() if len(pipelineOperators) == 0 { return errors.New("processor requires at least one operator to be configured") } @@ -93,8 +105,6 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos ltp.fromConverter = adapter.NewFromPdataConverter(wkrCount, ltp.logger) ltp.fromConverter.Start() - ltp.outputChannel = make(chan outputType) - // Below we're starting 3 loops: // * first which reads all the logs translated by the fromConverter and then forwards // them to pipeline @@ -110,37 +120,16 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos // ... // * third which reads all the logs produced by the converter - // (aggregated by Resource) and then places them on the outputChannel + // (aggregated by Resource) and then places them on the next consumer ltp.wg.Add(1) go ltp.consumerLoop(ctx) return nil } -func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { +func (ltp *logsTransformProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { // Add the logs to the chain - err := ltp.fromConverter.Batch(ld) - if err != nil { - return ld, err - } - - doneChan := ctx.Done() - for { - select { - case <-doneChan: - ltp.logger.Debug("loop stopped") - return ld, errors.New("processor interrupted") - case output, ok := <-ltp.outputChannel: - if !ok { - return ld, errors.New("processor encountered an issue receiving logs from stanza operators pipeline") - } - if output.err != nil { - return ld, err - } - - return output.logs, nil - } - } + return ltp.fromConverter.Batch(ld) } // converterLoop reads the log entries produced by the fromConverter and sends them @@ -163,7 +152,7 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) { for _, e := range entries { // Add item to the first operator of the pipeline manually if err := ltp.firstOperator.Process(ctx, e); err != nil { - ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the pipeline: %w", err)} + ltp.logger.Error("processor encountered an issue with the pipeline", zap.Error(err)) break } } @@ -188,7 +177,7 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) { } if err := ltp.converter.Batch(e); err != nil { - ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the converter: %w", err)} + ltp.logger.Error("processor encountered an issue with the converter", zap.Error(err)) } } } @@ -210,7 +199,9 @@ func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context) { return } - ltp.outputChannel <- outputType{logs: pLogs, err: nil} + if err := ltp.consumer.ConsumeLogs(ctx, pLogs); err != nil { + ltp.logger.Error("processor encountered an issue with next consumer", zap.Error(err)) + } } } } diff --git a/processor/logstransformprocessor/processor_test.go b/processor/logstransformprocessor/processor_test.go index b474866947e89..c0a5b7bf1eb03 100644 --- a/processor/logstransformprocessor/processor_test.go +++ b/processor/logstransformprocessor/processor_test.go @@ -76,17 +76,11 @@ type testLogMessage struct { attributes *map[string]pcommon.Value } -// Temporary abstraction to avoid "unused" linter -var skip = func(t *testing.T, why string) { - t.Skip(why) -} - func TestLogsTransformProcessor(t *testing.T) { - skip(t, "Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9761") baseMessage := pcommon.NewValueStr("2022-01-01 01:02:03 INFO this is a test message") spanID := pcommon.SpanID([8]byte{0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff}) traceID := pcommon.TraceID([16]byte{0x48, 0x01, 0x40, 0xf3, 0xd7, 0x70, 0xa5, 0xae, 0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff}) - infoSeverityText := "Info" + infoSeverityText := "INFO" tests := []struct { name string @@ -163,6 +157,7 @@ func TestLogsTransformProcessor(t *testing.T) { wantLogData := generateLogData(tt.parsedMessages) err = ltp.ConsumeLogs(context.Background(), sourceLogData) require.NoError(t, err) + time.Sleep(200 * time.Millisecond) logs := tln.AllLogs() require.Len(t, logs, 1)