Skip to content

Commit

Permalink
Isolate tracer provider per-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jun 14, 2022
1 parent 32aa6ff commit 76f8681
Show file tree
Hide file tree
Showing 167 changed files with 754 additions and 955 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file.
- Go API: Added component bundle `./public/components/aws` for all AWS components, including a `RunLambda` function.
- New `cached` processor.
- Go API: New APIs for registering both metrics exporters and open telemetry tracer plugins.
- Go API: The stream builder API now supports configuring a tracer, and tracer configuration is now isolated to the stream being executed.

### Fixed

Expand Down
3 changes: 3 additions & 0 deletions internal/bundle/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"net/http"
"regexp"

"go.opentelemetry.io/otel/trace"

"github.com/benthosdev/benthos/v4/internal/bloblang"
"github.com/benthosdev/benthos/v4/internal/bloblang/query"
"github.com/benthosdev/benthos/v4/internal/component/buffer"
Expand Down Expand Up @@ -41,6 +43,7 @@ type NewManagement interface {

Metrics() metrics.Type
Logger() log.Modular
Tracer() trace.TracerProvider
BloblEnvironment() *bloblang.Environment

RegisterEndpoint(path, desc string, h http.HandlerFunc)
Expand Down
8 changes: 5 additions & 3 deletions internal/bundle/tracers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"sort"

"go.opentelemetry.io/otel/trace"

"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/tracer"
"github.com/benthosdev/benthos/v4/internal/docs"
Expand All @@ -23,7 +25,7 @@ func (e *Environment) TracersAdd(constructor TracerConstructor, spec docs.Compon
}

// TracersInit attempts to initialise a tracers exporter from a config.
func (e *Environment) TracersInit(conf tracer.Config, nm NewManagement) (tracer.Type, error) {
func (e *Environment) TracersInit(conf tracer.Config, nm NewManagement) (trace.TracerProvider, error) {
return e.tracers.Init(conf, nm)
}

Expand All @@ -35,7 +37,7 @@ func (e *Environment) TracersDocs() []docs.ComponentSpec {
//------------------------------------------------------------------------------

// TracerConstructor constructs an tracer component.
type TracerConstructor func(tracer.Config, NewManagement) (tracer.Type, error)
type TracerConstructor func(tracer.Config, NewManagement) (trace.TracerProvider, error)

type tracerSpec struct {
constructor TracerConstructor
Expand Down Expand Up @@ -66,7 +68,7 @@ func (s *TracerSet) Add(constructor TracerConstructor, spec docs.ComponentSpec)
}

// Init attempts to initialise an tracer from a config.
func (s *TracerSet) Init(conf tracer.Config, nm NewManagement) (tracer.Type, error) {
func (s *TracerSet) Init(conf tracer.Config, nm NewManagement) (trace.TracerProvider, error) {
spec, exists := s.specs[conf.Type]
if !exists {
return nil, component.ErrInvalidType("tracer", conf.Type)
Expand Down
15 changes: 0 additions & 15 deletions internal/bundle/tracing/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ import (
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/bundle/tracing"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/component/processor"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/manager"
"github.com/benthosdev/benthos/v4/internal/manager/mock"
"github.com/benthosdev/benthos/v4/internal/message"

"github.com/stretchr/testify/assert"
Expand All @@ -36,9 +33,6 @@ func TestBundleInputTracing(t *testing.T) {

mgr, err := manager.New(
manager.NewResourceConfig(),
mock.NewManager(),
log.Noop(),
metrics.Noop(),
manager.OptSetEnvironment(tenv),
)
require.NoError(t, err)
Expand Down Expand Up @@ -85,9 +79,6 @@ func TestBundleOutputTracing(t *testing.T) {

mgr, err := manager.New(
manager.NewResourceConfig(),
mock.NewManager(),
log.Noop(),
metrics.Noop(),
manager.OptSetEnvironment(tenv),
)
require.NoError(t, err)
Expand Down Expand Up @@ -146,9 +137,6 @@ func TestBundleOutputWithProcessorsTracing(t *testing.T) {

mgr, err := manager.New(
manager.NewResourceConfig(),
mock.NewManager(),
log.Noop(),
metrics.Noop(),
manager.OptSetEnvironment(tenv),
)
require.NoError(t, err)
Expand Down Expand Up @@ -223,9 +211,6 @@ root.count = if $ctr % 2 == 0 { throw("nah %v".format($ctr)) } else { $ctr }

mgr, err := manager.New(
manager.NewResourceConfig(),
mock.NewManager(),
log.Noop(),
metrics.Noop(),
manager.OptSetEnvironment(tenv),
)
require.NoError(t, err)
Expand Down
21 changes: 17 additions & 4 deletions internal/cli/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
"syscall"
"time"

"go.opentelemetry.io/otel/trace"
"gopkg.in/natefinch/lumberjack.v2"
"gopkg.in/yaml.v3"

"github.com/benthosdev/benthos/v4/internal/api"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/component/tracer"
"github.com/benthosdev/benthos/v4/internal/config"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/log"
Expand Down Expand Up @@ -320,12 +320,18 @@ func cmdService(
}()

// Create our tracer type.
var trac tracer.Type
var trac trace.TracerProvider
if trac, err = bundle.AllTracers.Init(conf.Tracer, tmpMgr); err != nil {
logger.Errorf("Failed to initialise tracer: %v\n", err)
return 1
}
defer trac.Close()
defer func() {
if shutter, ok := trac.(interface {
Shutdown(context.Context) error
}); ok {
_ = shutter.Shutdown(context.Background())
}
}()

// Create HTTP API with a sanitised service config.
var sanitNode yaml.Node
Expand All @@ -345,7 +351,14 @@ func cmdService(
}

// Create resource manager.
manager, err := manager.New(conf.ResourceConfig, httpServer, logger, stats, manager.OptSetStreamsMode(streamsMode))
manager, err := manager.New(
conf.ResourceConfig,
manager.OptSetAPIReg(httpServer),
manager.OptSetLogger(logger),
manager.OptSetMetrics(stats),
manager.OptSetTracer(trac),
manager.OptSetStreamsMode(streamsMode),
)
if err != nil {
logger.Errorf("Failed to create resource: %v\n", err)
return 1
Expand Down
5 changes: 2 additions & 3 deletions internal/cli/test/processors_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/benthosdev/benthos/v4/internal/bloblang/mapping"
"github.com/benthosdev/benthos/v4/internal/bloblang/parser"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/component/processor"
"github.com/benthosdev/benthos/v4/internal/config"
"github.com/benthosdev/benthos/v4/internal/docs"
Expand Down Expand Up @@ -99,7 +98,7 @@ func (p *ProcessorsProvider) ProvideBloblang(pathStr string) ([]processor.V1, er
}

return []processor.V1{
processor.NewV2BatchedToV1Processor("bloblang", newBloblang(exec, p.logger), metrics.Noop()),
processor.NewV2BatchedToV1Processor("bloblang", newBloblang(exec, p.logger), mock.NewManager()),
}, nil
}

Expand Down Expand Up @@ -145,7 +144,7 @@ func (b *bloblangProc) Close(context.Context) error {
//------------------------------------------------------------------------------

func (p *ProcessorsProvider) initProcs(confs cachedConfig) ([]processor.V1, error) {
mgr, err := manager.New(confs.mgr, mock.NewManager(), p.logger, metrics.Noop())
mgr, err := manager.New(confs.mgr, manager.OptSetLogger(p.logger))
if err != nil {
return nil, fmt.Errorf("failed to initialise resources: %v", err)
}
Expand Down
14 changes: 9 additions & 5 deletions internal/component/buffer/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel/trace"

"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/log"
Expand Down Expand Up @@ -51,6 +53,7 @@ type ReaderWriter interface {
type Stream struct {
stats metrics.Type
log log.Modular
tracer trace.TracerProvider
typeStr string

buffer ReaderWriter
Expand All @@ -65,11 +68,12 @@ type Stream struct {
}

// NewStream creates a new Producer/Consumer around a buffer.
func NewStream(typeStr string, buffer ReaderWriter, log log.Modular, stats metrics.Type) Streamed {
func NewStream(typeStr string, buffer ReaderWriter, mgr component.Observability) Streamed {
m := Stream{
typeStr: typeStr,
stats: stats,
log: log,
stats: mgr.Metrics(),
log: mgr.Logger(),
tracer: mgr.Tracer(),
buffer: buffer,
shutSig: shutdown.NewSignaller(),
messagesOut: make(chan message.Transaction),
Expand Down Expand Up @@ -124,7 +128,7 @@ func (m *Stream) inputLoop() {
}

batchLen := tr.Payload.Len()
err := m.buffer.Write(closeAtLeisureCtx, tracing.WithSiblingSpans(m.typeStr, tr.Payload), ackFunc)
err := m.buffer.Write(closeAtLeisureCtx, tracing.WithSiblingSpans(m.tracer, m.typeStr, tr.Payload), ackFunc)
if err == nil {
mReceivedCount.Incr(int64(batchLen))
mReceivedBatchCount.Incr(1)
Expand Down Expand Up @@ -170,7 +174,7 @@ func (m *Stream) outputLoop() {
}

// It's possible that the buffer wiped our previous root span.
tracing.InitSpans(m.typeStr, msg)
tracing.InitSpans(m.tracer, m.typeStr, msg)

batchLen := msg.Len()

Expand Down
9 changes: 4 additions & 5 deletions internal/component/buffer/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/message"
)

Expand All @@ -23,7 +22,7 @@ func TestStreamMemoryBuffer(t *testing.T) {
tChan := make(chan message.Transaction)
resChan := make(chan error)

b := NewStream("meow", newMemoryBuffer(int(total)), log.Noop(), metrics.Noop())
b := NewStream("meow", newMemoryBuffer(int(total)), component.NoopObservability())
require.NoError(t, b.Consume(tChan))

var i uint8
Expand Down Expand Up @@ -156,7 +155,7 @@ func TestStreamBufferClosing(t *testing.T) {
tChan := make(chan message.Transaction)
resChan := make(chan error)

b := NewStream("meow", newMemoryBuffer(int(total)), log.Noop(), metrics.Noop())
b := NewStream("meow", newMemoryBuffer(int(total)), component.NoopObservability())
require.NoError(t, b.Consume(tChan))

var i uint8
Expand Down Expand Up @@ -247,7 +246,7 @@ func TestStreamReadErrors(t *testing.T) {
errBuf.readErrs <- errors.New("first error")
errBuf.readErrs <- errors.New("second error")

b := NewStream("meow", errBuf, log.Noop(), metrics.Noop())
b := NewStream("meow", errBuf, component.NoopObservability())
require.NoError(t, b.Consume(tChan))

var tran message.Transaction
Expand Down
Loading

0 comments on commit 76f8681

Please sign in to comment.