Skip to content

Commit

Permalink
Change resource processor to use processor helper (#768)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 19, 2020
1 parent f7360ee commit d51f6ba
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 27 deletions.
4 changes: 2 additions & 2 deletions processor/resourcedetectionprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factories.Processors[typeStr] = &Factory{}
factory := NewFactory()
factories.Processors[typeStr] = factory

cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.NoError(t, err)
Expand Down
38 changes: 20 additions & 18 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
Expand All @@ -36,8 +37,7 @@ const (
typeStr = "resourcedetection"
)

// Factory is the factory for resourcedetection processor.
type Factory struct {
type factory struct {
resourceProviderFactory *internal.ResourceProviderFactory

// providers stores a provider for each named processor that
Expand All @@ -46,27 +46,32 @@ type Factory struct {
lock sync.Mutex
}

// NewFactory creates a new factory for resourcedetection processor.
func NewFactory() *Factory {
// NewFactory creates a new factory for ResourceDetection processor.
func NewFactory() component.ProcessorFactory {
resourceProviderFactory := internal.NewProviderFactory(map[internal.DetectorType]internal.DetectorFactory{
env.TypeStr: env.NewDetector,
gce.TypeStr: gce.NewDetector,
ec2.TypeStr: ec2.NewDetector,
})

return &Factory{
f := &factory{
resourceProviderFactory: resourceProviderFactory,
providers: map[string]*internal.ResourceProvider{},
}

return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(f.createTraceProcessor),
processorhelper.WithMetrics(f.createMetricsProcessor))
}

// Type gets the type of the Option config created by this factory.
func (*Factory) Type() configmodels.Type {
func (*factory) Type() configmodels.Type {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (*Factory) CreateDefaultConfig() configmodels.Processor {
func createDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
Expand All @@ -78,42 +83,39 @@ func (*Factory) CreateDefaultConfig() configmodels.Processor {
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *Factory) CreateTraceProcessor(
func (f *factory) createTraceProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
nextConsumer consumer.TraceConsumer,
) (component.TraceProcessor, error) {
oCfg := cfg.(*Config)

provider, err := f.getResourceProvider(ctx, params.Logger, cfg.Name(), oCfg.Timeout, oCfg.Detectors)
provider, err := f.getResourceProvider(params.Logger, cfg.Name(), oCfg.Timeout, oCfg.Detectors)
if err != nil {
return nil, err
}

return newResourceTraceProcessor(ctx, nextConsumer, provider, oCfg.Override), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *Factory) CreateMetricsProcessor(
func (f *factory) createMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)

provider, err := f.getResourceProvider(ctx, params.Logger, cfg.Name(), oCfg.Timeout, oCfg.Detectors)
provider, err := f.getResourceProvider(params.Logger, cfg.Name(), oCfg.Timeout, oCfg.Detectors)
if err != nil {
return nil, err
}

return newResourceMetricProcessor(ctx, nextConsumer, provider, oCfg.Override), nil
}

func (f *Factory) getResourceProvider(
ctx context.Context,
func (f *factory) getResourceProvider(
logger *zap.Logger,
processorName string,
timeout time.Duration,
Expand Down
3 changes: 1 addition & 2 deletions processor/resourcedetectionprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
)

func TestCreateDefaultConfig(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()
cfg := createDefaultConfig()
assert.NoError(t, configcheck.ValidateConfig(cfg))
assert.NotNil(t, cfg)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type resourceTraceProcessor struct {
next consumer.TraceConsumer
}

func newResourceTraceProcessor(ctx context.Context, next consumer.TraceConsumer, provider *internal.ResourceProvider, override bool) *resourceTraceProcessor {
func newResourceTraceProcessor(_ context.Context, next consumer.TraceConsumer, provider *internal.ResourceProvider, override bool) *resourceTraceProcessor {
return &resourceTraceProcessor{
provider: provider,
override: override,
Expand Down Expand Up @@ -79,7 +79,7 @@ type resourceMetricProcessor struct {
next consumer.MetricsConsumer
}

func newResourceMetricProcessor(ctx context.Context, next consumer.MetricsConsumer, provider *internal.ResourceProvider, override bool) *resourceMetricProcessor {
func newResourceMetricProcessor(_ context.Context, next consumer.MetricsConsumer, provider *internal.ResourceProvider, override bool) *resourceMetricProcessor {
return &resourceMetricProcessor{
provider: provider,
override: override,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestResourceProcessor(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := &Factory{providers: map[string]*internal.ResourceProvider{}}
factory := &factory{providers: map[string]*internal.ResourceProvider{}}

md1 := &MockDetector{}
md1.On("Detect").Return(tt.detectedResource, tt.detectedError)
Expand All @@ -174,7 +174,7 @@ func TestResourceProcessor(t *testing.T) {

// Test trace consuner
ttn := &exportertest.SinkTraceExporter{}
rtp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, ttn, cfg)
rtp, err := factory.createTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, ttn)

if tt.expectedNewError != "" {
assert.EqualError(t, err, tt.expectedNewError)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestResourceProcessor(t *testing.T) {

// Test metrics consumer
tmn := &exportertest.SinkMetricsExporter{}
rmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, tmn, cfg)
rmp, err := factory.createMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, tmn)

if tt.expectedNewError != "" {
assert.EqualError(t, err, tt.expectedNewError)
Expand Down

0 comments on commit d51f6ba

Please sign in to comment.