Skip to content

Commit

Permalink
use exporterhelper
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jun 8, 2023
1 parent 49d6711 commit dd5d78c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 57 deletions.
61 changes: 40 additions & 21 deletions processor/websocketprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/websocketprocessor/internal/metadata"
)

var processors = sharedcomponent.NewSharedComponents()

func NewFactory() processor.Factory {
return processor.NewFactory(
metadata.Type,
Expand All @@ -23,32 +27,47 @@ func NewFactory() processor.Factory {
)
}

func createMetricsProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, consumer consumer.Metrics) (processor.Metrics, error) {
func createMetricsProcessor(ctx context.Context, params processor.CreateSettings, cfg component.Config, c consumer.Metrics) (processor.Metrics, error) {
rCfg := cfg.(*Config)
p, err := newProcessor(params, rCfg)
if err != nil {
return nil, err
}
p.metricsSink = consumer
return p, nil
p := processors.GetOrAdd(cfg, func() component.Component {
return newProcessor(params, rCfg)
})
fn := p.Unwrap().(*wsprocessor).ConsumeMetrics
return processorhelper.NewMetricsProcessor(ctx, params, cfg, c,
fn,
processorhelper.WithCapabilities(consumer.Capabilities{
MutatesData: false,
}),
processorhelper.WithStart(p.Start),
processorhelper.WithShutdown(p.Shutdown))
}

func createLogsProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, consumer consumer.Logs) (processor.Logs, error) {
func createLogsProcessor(ctx context.Context, params processor.CreateSettings, cfg component.Config, c consumer.Logs) (processor.Logs, error) {
rCfg := cfg.(*Config)
p, err := newProcessor(params, rCfg)
if err != nil {
return nil, err
}
p.logsSink = consumer
return p, nil
p := processors.GetOrAdd(cfg, func() component.Component {
return newProcessor(params, rCfg)
})
fn := p.Unwrap().(*wsprocessor).ConsumeLogs
return processorhelper.NewLogsProcessor(ctx, params, cfg, c,
fn,
processorhelper.WithCapabilities(consumer.Capabilities{
MutatesData: false,
}),
processorhelper.WithStart(p.Start),
processorhelper.WithShutdown(p.Shutdown))
}

func createTraceProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, consumer consumer.Traces) (processor.Traces, error) {
func createTraceProcessor(ctx context.Context, params processor.CreateSettings, cfg component.Config, c consumer.Traces) (processor.Traces, error) {
rCfg := cfg.(*Config)
p, err := newProcessor(params, rCfg)
if err != nil {
return nil, err
}
p.tracesSink = consumer
return p, nil
p := processors.GetOrAdd(cfg, func() component.Component {
return newProcessor(params, rCfg)
})
fn := p.Unwrap().(*wsprocessor).ConsumeTraces
return processorhelper.NewTracesProcessor(ctx, params, cfg, c,
fn,
processorhelper.WithCapabilities(consumer.Capabilities{
MutatesData: false,
}),
processorhelper.WithStart(p.Start),
processorhelper.WithShutdown(p.Shutdown))
}
3 changes: 3 additions & 0 deletions processor/websocketprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/webso
go 1.19

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.79.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector v0.79.0
go.opentelemetry.io/collector/component v0.79.0
Expand Down Expand Up @@ -50,3 +51,5 @@ require (
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent
2 changes: 2 additions & 0 deletions processor/websocketprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 8 additions & 36 deletions processor/websocketprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -26,41 +24,21 @@ import (
type wsprocessor struct {
config *Config
telemetrySettings component.TelemetrySettings
obsproc *obsreport.Processor
logsSink consumer.Logs
metricsSink consumer.Metrics
tracesSink consumer.Traces
server *http.Server
shutdownWG sync.WaitGroup
cs *channelSet
}

var processors = map[*Config]*wsprocessor{}

var logMarshaler = &plog.JSONMarshaler{}
var metricMarshaler = &pmetric.JSONMarshaler{}
var traceMarshaler = &ptrace.JSONMarshaler{}

func newProcessor(settings processor.CreateSettings, config *Config) (*wsprocessor, error) {
if p, ok := processors[config]; ok {
return p, nil
}
obsproc, err := obsreport.NewProcessor(obsreport.ProcessorSettings{
ProcessorID: settings.ID,
ProcessorCreateSettings: settings,
})
if err != nil {
return nil, err
}
p := &wsprocessor{
func newProcessor(settings processor.CreateSettings, config *Config) *wsprocessor {
return &wsprocessor{
config: config,
obsproc: obsproc,
telemetrySettings: settings.TelemetrySettings,
cs: newChannelSet(),
}
processors[config] = p

return p, nil
}

func (w *wsprocessor) Start(_ context.Context, host component.Host) error {
Expand Down Expand Up @@ -110,38 +88,32 @@ func (w *wsprocessor) Shutdown(ctx context.Context) error {
return nil
}

func (w *wsprocessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: false,
}
}

func (w *wsprocessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
func (w *wsprocessor) ConsumeMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
b, err := metricMarshaler.MarshalMetrics(md)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
return w.metricsSink.ConsumeMetrics(ctx, md)
return md, nil
}

func (w *wsprocessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
func (w *wsprocessor) ConsumeLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) {
b, err := logMarshaler.MarshalLogs(ld)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
return w.logsSink.ConsumeLogs(ctx, ld)
return ld, nil
}

func (w *wsprocessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
func (w *wsprocessor) ConsumeTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
b, err := traceMarshaler.MarshalTraces(td)
if err != nil {
w.telemetrySettings.Logger.Debug("Error serializing to JSON", zap.Error(err))
} else {
w.cs.writeBytes(b)
}
return w.tracesSink.ConsumeTraces(ctx, td)
return td, nil
}

0 comments on commit dd5d78c

Please sign in to comment.