From 6b553b94fb4e08123f9cbba7680127fb25d423af Mon Sep 17 00:00:00 2001 From: Albert <26584478+albertteoh@users.noreply.github.com> Date: Wed, 3 Aug 2022 05:42:44 +1000 Subject: [PATCH] Fix panic caused by race condition when accessing span attributes (#12661) * Fix race condition when accessing span attributes Signed-off-by: albertteoh * Add changelog Signed-off-by: albertteoh * Use multierr to capture both metrics and traces errs Signed-off-by: albertteoh * Fix tests Signed-off-by: albertteoh * make gotidy Signed-off-by: albertteoh * Update comment Signed-off-by: albertteoh --- processor/spanmetricsprocessor/go.mod | 2 +- processor/spanmetricsprocessor/processor.go | 51 +++++++++---------- .../spanmetricsprocessor/processor_test.go | 37 ++++++++------ unreleased/12644-fix-panic-race.yaml | 16 ++++++ 4 files changed, 62 insertions(+), 44 deletions(-) create mode 100755 unreleased/12644-fix-panic-race.yaml diff --git a/processor/spanmetricsprocessor/go.mod b/processor/spanmetricsprocessor/go.mod index 31fdc8cd87ebd..0eddcd7eeb89a 100644 --- a/processor/spanmetricsprocessor/go.mod +++ b/processor/spanmetricsprocessor/go.mod @@ -11,6 +11,7 @@ require ( go.opentelemetry.io/collector v0.56.0 go.opentelemetry.io/collector/pdata v0.56.0 go.opentelemetry.io/collector/semconv v0.56.0 + go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 google.golang.org/grpc v1.48.0 ) @@ -80,7 +81,6 @@ require ( go.opentelemetry.io/otel/metric v0.31.0 // indirect go.opentelemetry.io/otel/trace v1.8.0 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.8.0 // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect golang.org/x/text v0.3.7 // indirect diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index 548cef93e10a6..747cf0de4e767 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache" @@ -220,36 +221,32 @@ func (p *processorImp) Capabilities() consumer.Capabilities { // It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter. // The original input trace data will be forwarded to the next consumer, unmodified. func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { - // Execute trace to metrics aggregation as a goroutine and only log errors instead - // of failing the entire pipeline to prioritize the propagation of trace data, - // regardless of error. - // - // This processor should be treated as a branched, out-of-band process - // that should not interfere with the flow of trace data because - // it is an orthogonal concern to the trace flow (it should not impact - // upstream or downstream pipeline trace components). - go func() { - // Since this is in a goroutine, the entire func can be locked without - // impacting trace processing performance. This also significantly - // reduces the number of locks/unlocks to manage, reducing the - // concurrency-bug surface area. - p.lock.Lock() - defer p.lock.Unlock() - - p.aggregateMetrics(traces) - m, err := p.buildMetrics() + // Forward trace data unmodified and propagate both metrics and trace pipeline errors, if any. + return multierr.Combine(p.tracesToMetrics(ctx, traces), p.nextConsumer.ConsumeTraces(ctx, traces)) +} - if err != nil { - p.logger.Error(err.Error()) - } else if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil { - p.logger.Error(err.Error()) - } +func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces) error { + p.lock.Lock() + + p.aggregateMetrics(traces) + m, err := p.buildMetrics() + + // Exemplars are only relevant to this batch of traces, so must be cleared within the lock, + // regardless of error while building metrics, before the next batch of spans is received. + p.resetExemplarData() + + // This component no longer needs to read the metrics once built, so it is safe to unlock. + p.lock.Unlock() - p.resetExemplarData() - }() + if err != nil { + return err + } - // Forward trace data unmodified. - return p.nextConsumer.ConsumeTraces(ctx, traces) + if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil { + return err + } + + return nil } // buildMetrics collects the computed raw metrics data, builds the metrics object and diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 424d5418cc172..5d640d88089f8 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -35,7 +35,6 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" "go.uber.org/zap/zaptest" - "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache" @@ -189,18 +188,22 @@ func TestProcessorConsumeTracesErrors(t *testing.T) { consumeTracesErr error }{ { - name: "metricsExporter error", - consumeMetricsErr: fmt.Errorf("metricsExporter error"), + name: "ConsumeMetrics error", + consumeMetricsErr: fmt.Errorf("consume metrics error"), }, { - name: "nextConsumer error", - consumeTracesErr: fmt.Errorf("nextConsumer error"), + name: "ConsumeTraces error", + consumeTracesErr: fmt.Errorf("consume traces error"), + }, + { + name: "ConsumeMetrics and ConsumeTraces error", + consumeMetricsErr: fmt.Errorf("consume metrics error"), + consumeTracesErr: fmt.Errorf("consume traces error"), }, } { t.Run(tc.name, func(t *testing.T) { // Prepare - obs, logs := observer.New(zap.ErrorLevel) - logger := zap.New(obs) + logger := zap.NewNop() mexp := &mocks.MetricsExporter{} mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(tc.consumeMetricsErr) @@ -215,17 +218,19 @@ func TestProcessorConsumeTracesErrors(t *testing.T) { // Test ctx := metadata.NewIncomingContext(context.Background(), nil) err := p.ConsumeTraces(ctx, traces) - if tc.consumeTracesErr != nil { - require.Error(t, err) - assert.EqualError(t, err, tc.consumeTracesErr.Error()) - return - } // Verify - require.NoError(t, err) - assert.Eventually(t, func() bool { - return logs.FilterMessage(tc.consumeMetricsErr.Error()).Len() > 0 - }, 10*time.Second, time.Millisecond*100) + require.Error(t, err) + switch { + case tc.consumeMetricsErr != nil && tc.consumeTracesErr != nil: + assert.EqualError(t, err, tc.consumeMetricsErr.Error()+"; "+tc.consumeTracesErr.Error()) + case tc.consumeMetricsErr != nil: + assert.EqualError(t, err, tc.consumeMetricsErr.Error()) + case tc.consumeTracesErr != nil: + assert.EqualError(t, err, tc.consumeTracesErr.Error()) + default: + assert.Fail(t, "expected at least one error") + } }) } } diff --git a/unreleased/12644-fix-panic-race.yaml b/unreleased/12644-fix-panic-race.yaml new file mode 100755 index 0000000000000..c9e999a322a63 --- /dev/null +++ b/unreleased/12644-fix-panic-race.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: spanmetricsprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix panic caused by race condition when accessing span attributes. + +# One or more tracking issues related to the change +issues: [12644] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: