Skip to content

Commit

Permalink
[processor/logstransform] refactor to pass along logs asynchronously (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#17079)

* Make the logs transform processor more reliable by making it asynchronous
  • Loading branch information
dehaansa committed Dec 27, 2022
1 parent add0772 commit 5c2184f
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 70 deletions.
16 changes: 16 additions & 0 deletions .chloggen/make-logs-transform-processor-more-reliable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: logstransformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Lets the logs transform processor directly pass messags to next consumer, avoiding the timing issues it previously exhibited.

# One or more tracking issues related to the change
issues: [16604, 15378, 9761]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
15 changes: 1 addition & 14 deletions processor/logstransformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -67,17 +66,5 @@ func createLogsProcessor(
return nil, errors.New("no operators were configured for this logs transform processor")
}

proc := &logsTransformProcessor{
logger: set.Logger,
config: pCfg,
}
return processorhelper.NewLogsProcessor(
ctx,
set,
cfg,
nextConsumer,
proc.processLogs,
processorhelper.WithStart(proc.Start),
processorhelper.WithShutdown(proc.Shutdown),
processorhelper.WithCapabilities(processorCapabilities))
return newProcessor(pCfg, nextConsumer, set.Logger)
}
89 changes: 40 additions & 49 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package logstransformprocessor // import "github.com/open-telemetry/opentelemetr
import (
"context"
"errors"
"fmt"
"math"
"runtime"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
Expand All @@ -32,22 +32,45 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

type outputType struct {
logs plog.Logs
err error
}

type logsTransformProcessor struct {
logger *zap.Logger
config *Config

consumer consumer.Logs

pipe *pipeline.DirectedPipeline
firstOperator operator.Operator
emitter *adapter.LogEmitter
converter *adapter.Converter
fromConverter *adapter.FromPdataConverter
wg sync.WaitGroup
outputChannel chan outputType
}

func newProcessor(config *Config, nextConsumer consumer.Logs, logger *zap.Logger) (*logsTransformProcessor, error) {
p := &logsTransformProcessor{
logger: logger,
config: config,
consumer: nextConsumer,
}

baseCfg := p.config.BaseConfig

p.emitter = adapter.NewLogEmitter(p.logger.Sugar())
pipe, err := pipeline.Config{
Operators: baseCfg.Operators,
DefaultOutput: p.emitter,
}.Build(p.logger.Sugar())
if err != nil {
return nil, err
}

p.pipe = pipe

return p, nil
}

func (ltp *logsTransformProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error {
Expand All @@ -61,25 +84,14 @@ func (ltp *logsTransformProcessor) Shutdown(ctx context.Context) error {
}

func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Host) error {
baseCfg := ltp.config.BaseConfig

ltp.emitter = adapter.NewLogEmitter(ltp.logger.Sugar())
pipe, err := pipeline.Config{
Operators: baseCfg.Operators,
DefaultOutput: ltp.emitter,
}.Build(ltp.logger.Sugar())
if err != nil {
return err
}

// There is no need for this processor to use storage
err = pipe.Start(storage.NewNopClient())
err := ltp.pipe.Start(storage.NewNopClient())
if err != nil {
return err
}

ltp.pipe = pipe
pipelineOperators := pipe.Operators()
pipelineOperators := ltp.pipe.Operators()
if len(pipelineOperators) == 0 {
return errors.New("processor requires at least one operator to be configured")
}
Expand All @@ -93,8 +105,6 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos
ltp.fromConverter = adapter.NewFromPdataConverter(wkrCount, ltp.logger)
ltp.fromConverter.Start()

ltp.outputChannel = make(chan outputType)

// Below we're starting 3 loops:
// * first which reads all the logs translated by the fromConverter and then forwards
// them to pipeline
Expand All @@ -110,37 +120,16 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos

// ...
// * third which reads all the logs produced by the converter
// (aggregated by Resource) and then places them on the outputChannel
// (aggregated by Resource) and then places them on the next consumer
ltp.wg.Add(1)
go ltp.consumerLoop(ctx)

return nil
}

func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
func (ltp *logsTransformProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
// Add the logs to the chain
err := ltp.fromConverter.Batch(ld)
if err != nil {
return ld, err
}

doneChan := ctx.Done()
for {
select {
case <-doneChan:
ltp.logger.Debug("loop stopped")
return ld, errors.New("processor interrupted")
case output, ok := <-ltp.outputChannel:
if !ok {
return ld, errors.New("processor encountered an issue receiving logs from stanza operators pipeline")
}
if output.err != nil {
return ld, err
}

return output.logs, nil
}
}
return ltp.fromConverter.Batch(ld)
}

// converterLoop reads the log entries produced by the fromConverter and sends them
Expand All @@ -163,7 +152,7 @@ func (ltp *logsTransformProcessor) converterLoop(ctx context.Context) {
for _, e := range entries {
// Add item to the first operator of the pipeline manually
if err := ltp.firstOperator.Process(ctx, e); err != nil {
ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the pipeline: %w", err)}
ltp.logger.Error("processor encountered an issue with the pipeline", zap.Error(err))
break
}
}
Expand All @@ -188,7 +177,7 @@ func (ltp *logsTransformProcessor) emitterLoop(ctx context.Context) {
}

if err := ltp.converter.Batch(e); err != nil {
ltp.outputChannel <- outputType{err: fmt.Errorf("processor encountered an issue with the converter: %w", err)}
ltp.logger.Error("processor encountered an issue with the converter", zap.Error(err))
}
}
}
Expand All @@ -210,7 +199,9 @@ func (ltp *logsTransformProcessor) consumerLoop(ctx context.Context) {
return
}

ltp.outputChannel <- outputType{logs: pLogs, err: nil}
if err := ltp.consumer.ConsumeLogs(ctx, pLogs); err != nil {
ltp.logger.Error("processor encountered an issue with next consumer", zap.Error(err))
}
}
}
}
9 changes: 2 additions & 7 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,11 @@ type testLogMessage struct {
attributes *map[string]pcommon.Value
}

// Temporary abstraction to avoid "unused" linter
var skip = func(t *testing.T, why string) {
t.Skip(why)
}

func TestLogsTransformProcessor(t *testing.T) {
skip(t, "Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9761")
baseMessage := pcommon.NewValueStr("2022-01-01 01:02:03 INFO this is a test message")
spanID := pcommon.SpanID([8]byte{0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff})
traceID := pcommon.TraceID([16]byte{0x48, 0x01, 0x40, 0xf3, 0xd7, 0x70, 0xa5, 0xae, 0x32, 0xf0, 0xa2, 0x2b, 0x6a, 0x81, 0x2c, 0xff})
infoSeverityText := "Info"
infoSeverityText := "INFO"

tests := []struct {
name string
Expand Down Expand Up @@ -163,6 +157,7 @@ func TestLogsTransformProcessor(t *testing.T) {
wantLogData := generateLogData(tt.parsedMessages)
err = ltp.ConsumeLogs(context.Background(), sourceLogData)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
logs := tln.AllLogs()
require.Len(t, logs, 1)

Expand Down

0 comments on commit 5c2184f

Please sign in to comment.