Skip to content

Commit

Permalink
Fix panic caused by race condition when accessing span attributes (op…
Browse files Browse the repository at this point in the history
…en-telemetry#12661)

* Fix race condition when accessing span attributes

Signed-off-by: albertteoh <[email protected]>

* Add changelog

Signed-off-by: albertteoh <[email protected]>

* Use multierr to capture both metrics and traces errs

Signed-off-by: albertteoh <[email protected]>

* Fix tests

Signed-off-by: albertteoh <[email protected]>

* make gotidy

Signed-off-by: albertteoh <[email protected]>

* Update comment

Signed-off-by: albertteoh <[email protected]>
  • Loading branch information
albertteoh committed Aug 2, 2022
1 parent 3d21b13 commit 6b553b9
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 44 deletions.
2 changes: 1 addition & 1 deletion processor/spanmetricsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/collector v0.56.0
go.opentelemetry.io/collector/pdata v0.56.0
go.opentelemetry.io/collector/semconv v0.56.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.48.0
)
Expand Down Expand Up @@ -80,7 +81,6 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
51 changes: 24 additions & 27 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -220,36 +221,32 @@ func (p *processorImp) Capabilities() consumer.Capabilities {
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter.
// The original input trace data will be forwarded to the next consumer, unmodified.
func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
// Execute trace to metrics aggregation as a goroutine and only log errors instead
// of failing the entire pipeline to prioritize the propagation of trace data,
// regardless of error.
//
// This processor should be treated as a branched, out-of-band process
// that should not interfere with the flow of trace data because
// it is an orthogonal concern to the trace flow (it should not impact
// upstream or downstream pipeline trace components).
go func() {
// Since this is in a goroutine, the entire func can be locked without
// impacting trace processing performance. This also significantly
// reduces the number of locks/unlocks to manage, reducing the
// concurrency-bug surface area.
p.lock.Lock()
defer p.lock.Unlock()

p.aggregateMetrics(traces)
m, err := p.buildMetrics()
// Forward trace data unmodified and propagate both metrics and trace pipeline errors, if any.
return multierr.Combine(p.tracesToMetrics(ctx, traces), p.nextConsumer.ConsumeTraces(ctx, traces))
}

if err != nil {
p.logger.Error(err.Error())
} else if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil {
p.logger.Error(err.Error())
}
func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces) error {
p.lock.Lock()

p.aggregateMetrics(traces)
m, err := p.buildMetrics()

// Exemplars are only relevant to this batch of traces, so must be cleared within the lock,
// regardless of error while building metrics, before the next batch of spans is received.
p.resetExemplarData()

// This component no longer needs to read the metrics once built, so it is safe to unlock.
p.lock.Unlock()

p.resetExemplarData()
}()
if err != nil {
return err
}

// Forward trace data unmodified.
return p.nextConsumer.ConsumeTraces(ctx, traces)
if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil {
return err
}

return nil
}

// buildMetrics collects the computed raw metrics data, builds the metrics object and
Expand Down
37 changes: 21 additions & 16 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc/metadata"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -189,18 +188,22 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
consumeTracesErr error
}{
{
name: "metricsExporter error",
consumeMetricsErr: fmt.Errorf("metricsExporter error"),
name: "ConsumeMetrics error",
consumeMetricsErr: fmt.Errorf("consume metrics error"),
},
{
name: "nextConsumer error",
consumeTracesErr: fmt.Errorf("nextConsumer error"),
name: "ConsumeTraces error",
consumeTracesErr: fmt.Errorf("consume traces error"),
},
{
name: "ConsumeMetrics and ConsumeTraces error",
consumeMetricsErr: fmt.Errorf("consume metrics error"),
consumeTracesErr: fmt.Errorf("consume traces error"),
},
} {
t.Run(tc.name, func(t *testing.T) {
// Prepare
obs, logs := observer.New(zap.ErrorLevel)
logger := zap.New(obs)
logger := zap.NewNop()

mexp := &mocks.MetricsExporter{}
mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(tc.consumeMetricsErr)
Expand All @@ -215,17 +218,19 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.ConsumeTraces(ctx, traces)
if tc.consumeTracesErr != nil {
require.Error(t, err)
assert.EqualError(t, err, tc.consumeTracesErr.Error())
return
}

// Verify
require.NoError(t, err)
assert.Eventually(t, func() bool {
return logs.FilterMessage(tc.consumeMetricsErr.Error()).Len() > 0
}, 10*time.Second, time.Millisecond*100)
require.Error(t, err)
switch {
case tc.consumeMetricsErr != nil && tc.consumeTracesErr != nil:
assert.EqualError(t, err, tc.consumeMetricsErr.Error()+"; "+tc.consumeTracesErr.Error())
case tc.consumeMetricsErr != nil:
assert.EqualError(t, err, tc.consumeMetricsErr.Error())
case tc.consumeTracesErr != nil:
assert.EqualError(t, err, tc.consumeTracesErr.Error())
default:
assert.Fail(t, "expected at least one error")
}
})
}
}
Expand Down
16 changes: 16 additions & 0 deletions unreleased/12644-fix-panic-race.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix panic caused by race condition when accessing span attributes.

# One or more tracking issues related to the change
issues: [12644]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

0 comments on commit 6b553b9

Please sign in to comment.