Skip to content

Commit

Permalink
add span processor configuration (#8117)
Browse files Browse the repository at this point in the history
This PR adds support for configuring batch span processor for internal
collector traces. It adds support to export them via the stdout
exporter.

Linked issue
#8106

Follows
#8097

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten committed Jul 21, 2023
1 parent 5f11443 commit 5613523
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 7 deletions.
19 changes: 19 additions & 0 deletions .chloggen/codeboten_add-tracer-provider.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for span processor configuration for internal traces

# One or more tracking issues or pull requests related to the change
issues: [8106]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
These options are still experimental. To enable them, users must enable both
`telemetry.useOtelForInternalMetrics` and `telemetry.useOtelWithSDKConfigurationForInternalTelemetry`
feature gates.
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcorecol/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1
go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.39.0
go.opentelemetry.io/otel/exporters/prometheus v0.39.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1
go.opentelemetry.io/otel/exporters/prometheus v0.39.0/go.mod h1:4jo5Q4CROlCpSPsXLhymi+LYrDXd2ObU5wbKayfZs7Y=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0 h1:fl2WmyenEf6LYYlfHAtCUEDyGcpwJNqD4dHGO7PVm4w=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.39.0/go.mod h1:csyQxQ0UHHKVA8KApS7eUO/klMO5sd/av5CNZNU4O6w=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 h1:+XWJd3jf75RXJq29mxbuXhCXFDG3S3R4vBUeSI2P7tE=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0/go.mod h1:hqgzBPTf4yONMFgdZvL/bK42R/iinTyVQtiWihs3SZc=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
Expand Down
59 changes: 57 additions & 2 deletions service/internal/proctelemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

"go.opentelemetry.io/collector/obsreport"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
Expand Down Expand Up @@ -58,6 +60,9 @@ var (
attribute.String(semconv.AttributeNetHostName, ""),
attribute.String(semconv.AttributeNetHostPort, ""),
}

errNoValidMetricExporter = errors.New("no valid metric exporter")
errNoValidSpanExporter = errors.New("no valid span exporter")
)

func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
Expand All @@ -78,6 +83,56 @@ func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncE
return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader)
}

func InitSpanProcessor(_ context.Context, processor telemetry.SpanProcessor) (sdktrace.SpanProcessor, error) {
if processor.Batch != nil {
if processor.Batch.Exporter.Console != nil {
exp, err := stdouttrace.New(
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
opts := []sdktrace.BatchSpanProcessorOption{}
if processor.Batch.ExportTimeout != nil {
if *processor.Batch.ExportTimeout < 0 {
return nil, fmt.Errorf("invalid export timeout %d", *processor.Batch.ExportTimeout)
}
opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*processor.Batch.ExportTimeout)))
}
if processor.Batch.MaxExportBatchSize != nil {
if *processor.Batch.MaxExportBatchSize < 0 {
return nil, fmt.Errorf("invalid batch size %d", *processor.Batch.MaxExportBatchSize)
}
opts = append(opts, sdktrace.WithMaxExportBatchSize(*processor.Batch.MaxExportBatchSize))
}
if processor.Batch.MaxQueueSize != nil {
if *processor.Batch.MaxQueueSize < 0 {
return nil, fmt.Errorf("invalid queue size %d", *processor.Batch.MaxQueueSize)
}
opts = append(opts, sdktrace.WithMaxQueueSize(*processor.Batch.MaxQueueSize))
}
if processor.Batch.ScheduleDelay != nil {
if *processor.Batch.ScheduleDelay < 0 {
return nil, fmt.Errorf("invalid schedule delay %d", *processor.Batch.ScheduleDelay)
}
opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*processor.Batch.ScheduleDelay)))
}
return sdktrace.NewBatchSpanProcessor(exp, opts...), nil
}
return nil, errNoValidSpanExporter
}
return nil, fmt.Errorf("unsupported span processor type %v", processor)
}

func InitTracerProvider(res *resource.Resource, options []sdktrace.TracerProviderOption) (*sdktrace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(res),
}

opts = append(opts, options...)
return sdktrace.NewTracerProvider(opts...), nil
}

func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) {
opts := []sdkmetric.Option{
sdkmetric.WithResource(res),
Expand Down Expand Up @@ -175,7 +230,7 @@ func initPullExporter(exporter telemetry.MetricExporter, asyncErrorChannel chan
if exporter.Prometheus != nil {
return initPrometheusExporter(exporter.Prometheus, asyncErrorChannel)
}
return nil, nil, fmt.Errorf("no valid exporter")
return nil, nil, errNoValidMetricExporter
}

func initPeriodicExporter(ctx context.Context, exporter telemetry.MetricExporter, opts ...sdkmetric.PeriodicReaderOption) (sdkmetric.Reader, *http.Server, error) {
Expand Down Expand Up @@ -207,7 +262,7 @@ func initPeriodicExporter(ctx context.Context, exporter telemetry.MetricExporter
}
return sdkmetric.NewPeriodicReader(exp, opts...), nil, nil
}
return nil, nil, fmt.Errorf("no valid exporter")
return nil, nil, errNoValidMetricExporter
}

func normalizeEndpoint(endpoint string) string {
Expand Down
97 changes: 94 additions & 3 deletions service/internal/proctelemetry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMetricReader(t *testing.T) {
},
},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "pull/prometheus-invalid-config-no-host",
Expand Down Expand Up @@ -93,14 +93,14 @@ func TestMetricReader(t *testing.T) {
},
},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "periodic/no-exporter",
reader: telemetry.MetricReader{
Periodic: &telemetry.PeriodicMetricReader{},
},
err: errors.New("no valid exporter"),
err: errNoValidMetricExporter,
},
{
name: "periodic/console-exporter",
Expand Down Expand Up @@ -344,3 +344,94 @@ func TestMetricReader(t *testing.T) {
})
}
}

func TestSpanProcessor(t *testing.T) {
testCases := []struct {
name string
processor telemetry.SpanProcessor
args any
err error
}{
{
name: "no processor",
err: errors.New("unsupported span processor type {<nil> <nil>}"),
},
{
name: "batch processor invalid exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
Exporter: telemetry.SpanExporter{},
},
},
err: errNoValidSpanExporter,
},
{
name: "batch processor invalid batch size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(-1),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid batch size -1"),
},
{
name: "batch processor invalid export timeout console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ExportTimeout: intPtr(-2),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid export timeout -2"),
},
{
name: "batch processor invalid queue size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxQueueSize: intPtr(-3),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid queue size -3"),
},
{
name: "batch processor invalid schedule delay console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ScheduleDelay: intPtr(-4),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid schedule delay -4"),
},
{
name: "batch processor console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(0),
ExportTimeout: intPtr(0),
MaxQueueSize: intPtr(0),
ScheduleDelay: intPtr(0),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
_, err := InitSpanProcessor(context.Background(), tt.processor)
assert.Equal(t, tt.err, err)
})
}
}
1 change: 1 addition & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp
srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp

// process the configuration and initialize the pipeline
if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil {
Expand Down
26 changes: 24 additions & 2 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand Down Expand Up @@ -51,6 +53,7 @@ type telemetryInitializer struct {
views []*view.View
ocRegistry *ocmetric.Registry
mp metric.MeterProvider
tp trace.TracerProvider
servers []*http.Server

useOtel bool
Expand All @@ -60,7 +63,8 @@ type telemetryInitializer struct {

func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig bool) *telemetryInitializer {
return &telemetryInitializer{
mp: noop.NewMeterProvider(),
mp: noopmetric.NewMeterProvider(),
tp: trace.NewNoopTracerProvider(),
useOtel: useOtel,
disableHighCardinality: disableHighCardinality,
extendedConfig: extendedConfig,
Expand All @@ -79,6 +83,12 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings component

settings.Logger.Info("Setting up own telemetry...")

if tp, err := tel.initTraces(res, cfg); err == nil {
tel.tp = tp
} else {
return err
}

if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil {
otel.SetTextMapPropagator(tp)
} else {
Expand All @@ -88,6 +98,18 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings component
return tel.initMetrics(res, settings.Logger, cfg, asyncErrorChannel)
}

func (tel *telemetryInitializer) initTraces(res *resource.Resource, cfg telemetry.Config) (trace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{}
for _, processor := range cfg.Traces.Processors {
sp, err := proctelemetry.InitSpanProcessor(context.Background(), processor)
if err != nil {
return nil, err
}
opts = append(opts, sdktrace.WithSpanProcessor(sp))
}
return proctelemetry.InitTracerProvider(res, opts)
}

func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error {
// Initialize the ocRegistry, still used by the process metrics.
tel.ocRegistry = ocmetric.NewRegistry()
Expand Down
26 changes: 26 additions & 0 deletions service/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ type TracesConfig struct {
// tracecontext and b3 are supported. By default, the value is set to empty list and
// context propagation is disabled.
Propagators []string `mapstructure:"propagators"`
// Processors allow configuration of span processors to emit spans to
// any number of suported backends.
Processors []SpanProcessor `mapstructure:"processors"`
}

// Validate checks whether the current configuration is valid
Expand All @@ -132,6 +135,29 @@ func (c *Config) Validate() error {
return nil
}

func (sp *SpanProcessor) Unmarshal(conf *confmap.Conf) error {
if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() {
// only unmarshal if feature gate is enabled
return nil
}

if conf == nil {
return nil
}

if err := conf.Unmarshal(sp); err != nil {
return fmt.Errorf("invalid span processor configuration: %w", err)
}

if sp.Batch != nil {
if sp.Batch.Exporter.Console == nil {

This comment has been minimized.

Copy link
@bogdandrutu

bogdandrutu Jul 24, 2023

Member

This sound more like a Validate concern and not an Unmarshal. The rule of thumb that I use in general (maybe good to document somewhere) is:

  • If the config is not unmarshalled but provided from different sources (like hardcoded), does this verification apply to that case as well? If yes, then should be part of Validate.

This comment has been minimized.

Copy link
@bogdandrutu

bogdandrutu Jul 24, 2023

Member

Another rule is that it should be on the deepest object, in this case the sp.Batch.Exporter

return fmt.Errorf("invalid exporter configuration")
}
return nil
}
return fmt.Errorf("unsupported span processor type %s", conf.AllKeys())
}

func (mr *MetricReader) Unmarshal(conf *confmap.Conf) error {
if !obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() {
// only unmarshal if feature gate is enabled
Expand Down
Loading

0 comments on commit 5613523

Please sign in to comment.