From cfbb0f671372afe0176af74fa8d4461ecb6a6391 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 22 Jul 2021 08:49:06 -0700 Subject: [PATCH] Small cleanups in service/builder (#3693) Signed-off-by: Bogdan Drutu --- service/internal/builder/exporters_builder.go | 50 +++++++++---------- .../builder/exporters_builder_test.go | 2 +- .../internal/builder/extensions_builder.go | 27 +++++----- .../builder/extensions_builder_test.go | 2 +- service/internal/builder/pipelines_builder.go | 21 +++++--- service/internal/builder/receivers_builder.go | 20 +++----- 6 files changed, 62 insertions(+), 60 deletions(-) diff --git a/service/internal/builder/exporters_builder.go b/service/internal/builder/exporters_builder.go index 21c131628481..dace0590538d 100644 --- a/service/internal/builder/exporters_builder.go +++ b/service/internal/builder/exporters_builder.go @@ -139,31 +139,38 @@ type dataTypeRequirement struct { type dataTypeRequirements map[config.DataType]dataTypeRequirement // Data type requirements for all exporters. -type exportersRequiredDataTypes map[config.Exporter]dataTypeRequirements +type exportersRequiredDataTypes map[config.ComponentID]dataTypeRequirements // BuildExporters builds Exporters from config. func BuildExporters( logger *zap.Logger, tracerProvider trace.TracerProvider, buildInfo component.BuildInfo, - config *config.Config, + cfg *config.Config, factories map[config.Type]component.ExporterFactory, ) (Exporters, error) { logger = logger.With(zap.String(zapKindKey, zapKindLogExporter)) // We need to calculate required input data types for each exporter so that we know // which data type must be started for each exporter. - exporterInputDataTypes := calcExportersRequiredDataTypes(config) + exporterInputDataTypes := calcExportersRequiredDataTypes(cfg) exporters := make(Exporters) - // BuildExporters exporters based on configuration and required input data types. - for _, expCfg := range config.Exporters { + + // Build exporters exporters based on configuration and required input data types. + for expID, expCfg := range cfg.Exporters { set := component.ExporterCreateSettings{ - Logger: logger.With(zap.Stringer(zapNameKey, expCfg.ID())), + Logger: logger.With(zap.String(zapNameKey, expID.String())), TracerProvider: tracerProvider, BuildInfo: buildInfo, } - exp, err := buildExporter(context.Background(), factories, set, expCfg, exporterInputDataTypes) + + factory, exists := factories[expID.Type()] + if !exists || factory == nil { + return nil, fmt.Errorf("exporter factory not found for type: %s", expID.Type()) + } + + exp, err := buildExporter(context.Background(), factory, set, expCfg, exporterInputDataTypes[expID]) if err != nil { return nil, err } @@ -174,7 +181,7 @@ func BuildExporters( return exporters, nil } -func calcExportersRequiredDataTypes(config *config.Config) exportersRequiredDataTypes { +func calcExportersRequiredDataTypes(cfg *config.Config) exportersRequiredDataTypes { // Go over all pipelines. The data type of the pipeline defines what data type // each exporter is expected to receive. Collect all required types for each // exporter. @@ -187,20 +194,17 @@ func calcExportersRequiredDataTypes(config *config.Config) exportersRequiredData result := make(exportersRequiredDataTypes) // Iterate over pipelines. - for _, pipeline := range config.Service.Pipelines { + for _, pipeline := range cfg.Service.Pipelines { // Iterate over all exporters for this pipeline. - for _, expName := range pipeline.Exporters { - // Find the exporter config by name. - exporter := config.Exporters[expName] - - // Create the data type requirement for the exporter if it does not exist. - if result[exporter] == nil { - result[exporter] = make(dataTypeRequirements) + for _, expID := range pipeline.Exporters { + // Create the data type requirement for the expCfg if it does not exist. + if _, ok := result[expID]; !ok { + result[expID] = make(dataTypeRequirements) } - // Remember that this data type is required for the exporter and also which + // Remember that this data type is required for the expCfg and also which // pipeline the requirement is coming from. - result[exporter][pipeline.InputType] = dataTypeRequirement{pipeline} + result[expID][pipeline.InputType] = dataTypeRequirement{pipeline} } } return result @@ -208,22 +212,16 @@ func calcExportersRequiredDataTypes(config *config.Config) exportersRequiredData func buildExporter( ctx context.Context, - factories map[config.Type]component.ExporterFactory, + factory component.ExporterFactory, set component.ExporterCreateSettings, cfg config.Exporter, - exportersInputDataTypes exportersRequiredDataTypes, + inputDataTypes dataTypeRequirements, ) (*builtExporter, error) { - factory := factories[cfg.ID().Type()] - if factory == nil { - return nil, fmt.Errorf("exporter factory not found for type: %s", cfg.ID().Type()) - } - exporter := &builtExporter{ logger: set.Logger, expByDataType: make(map[config.DataType]component.Exporter, 3), } - inputDataTypes := exportersInputDataTypes[cfg] if inputDataTypes == nil { set.Logger.Info("Ignoring exporter as it is not used by any pipeline") return exporter, nil diff --git a/service/internal/builder/exporters_builder_test.go b/service/internal/builder/exporters_builder_test.go index e8076170c798..ec74a7632357 100644 --- a/service/internal/builder/exporters_builder_test.go +++ b/service/internal/builder/exporters_builder_test.go @@ -83,7 +83,7 @@ func TestBuildExporters(t *testing.T) { // Since the endpoint of opencensus exporter doesn't actually exist, e1 may // already stop because it cannot connect. // The test should stop running if this isn't the error cause. - require.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing") + require.EqualError(t, err, "rpc error: code = Canceled desc = grpc: the client connection is closing") } // Remove the pipeline so that the exporter is not attached to any pipeline. diff --git a/service/internal/builder/extensions_builder.go b/service/internal/builder/extensions_builder.go index 8a116b5fe897..fa9bbb0bf81e 100644 --- a/service/internal/builder/extensions_builder.go +++ b/service/internal/builder/extensions_builder.go @@ -119,20 +119,26 @@ func BuildExtensions( config *config.Config, factories map[config.Type]component.ExtensionFactory, ) (Extensions, error) { - logger = logger.With(zap.String(zapKindKey, zapKindExtension)) extensions := make(Extensions) - for _, extName := range config.Service.Extensions { - extCfg, exists := config.Extensions[extName] - if !exists { - return nil, fmt.Errorf("extension %q is not configured", extName) + for _, extID := range config.Service.Extensions { + extCfg, existsCfg := config.Extensions[extID] + if !existsCfg { + return nil, fmt.Errorf("extension %q is not configured", extID) + } + + factory, existsFactory := factories[extID.Type()] + if !existsFactory { + return nil, fmt.Errorf("extension factory for type %q is not configured", extID.Type()) } set := component.ExtensionCreateSettings{ - Logger: logger.With(zap.Stringer(zapNameKey, extCfg.ID())), + Logger: logger.With( + zap.String(zapKindKey, zapKindExtension), + zap.String(zapNameKey, extID.String())), TracerProvider: tracerProvider, BuildInfo: buildInfo, } - ext, err := buildExtension(context.Background(), factories, set, extCfg) + ext, err := buildExtension(context.Background(), factory, set, extCfg) if err != nil { return nil, err } @@ -143,12 +149,7 @@ func BuildExtensions( return extensions, nil } -func buildExtension(ctx context.Context, factories map[config.Type]component.ExtensionFactory, creationSet component.ExtensionCreateSettings, cfg config.Extension) (*builtExtension, error) { - factory := factories[cfg.ID().Type()] - if factory == nil { - return nil, fmt.Errorf("extension factory for type %q is not configured", cfg.ID().Type()) - } - +func buildExtension(ctx context.Context, factory component.ExtensionFactory, creationSet component.ExtensionCreateSettings, cfg config.Extension) (*builtExtension, error) { ext := &builtExtension{ logger: creationSet.Logger, } diff --git a/service/internal/builder/extensions_builder_test.go b/service/internal/builder/extensions_builder_test.go index cda5f9145019..b6ba87571e09 100644 --- a/service/internal/builder/extensions_builder_test.go +++ b/service/internal/builder/extensions_builder_test.go @@ -119,7 +119,7 @@ func TestService_setupExtensions(t *testing.T) { ext, err := BuildExtensions(zap.NewNop(), trace.NewNoopTracerProvider(), component.DefaultBuildInfo(), tt.config, tt.factories.Extensions) assert.Error(t, err) - assert.Equal(t, tt.wantErrMsg, err.Error()) + assert.EqualError(t, err, tt.wantErrMsg) assert.Equal(t, 0, len(ext)) }) } diff --git a/service/internal/builder/pipelines_builder.go b/service/internal/builder/pipelines_builder.go index a5bd450ac95e..9df8c5b4ec69 100644 --- a/service/internal/builder/pipelines_builder.go +++ b/service/internal/builder/pipelines_builder.go @@ -144,17 +144,24 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf // the processor itself becomes a consumer for the one that precedes it in // in the pipeline and so on. for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- { - procName := pipelineCfg.Processors[i] - procCfg := pb.config.Processors[procName] + procID := pipelineCfg.Processors[i] - factory := pb.factories[procCfg.ID().Type()] + procCfg, existsCfg := pb.config.Processors[procID] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", procID) + } + + factory, existsFactory := pb.factories[procID.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory for type %q is not configured", procID.Type()) + } // This processor must point to the next consumer and then // it becomes the next for the previous one (previous in the pipeline, // which we will build in the next loop iteration). var err error set := component.ProcessorCreateSettings{ - Logger: pb.logger.With(zap.String(zapKindKey, zapKindProcessor), zap.Stringer(zapNameKey, procCfg.ID())), + Logger: pb.logger.With(zap.String(zapKindKey, zapKindProcessor), zap.String(zapNameKey, procID.String())), TracerProvider: pb.tracerProvider, BuildInfo: pb.buildInfo, } @@ -188,17 +195,17 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf default: return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %s is not supported", - procName, pipelineCfg.Name, pipelineCfg.InputType) + procID, pipelineCfg.Name, pipelineCfg.InputType) } if err != nil { return nil, fmt.Errorf("error creating processor %q in pipeline %q: %v", - procName, pipelineCfg.Name, err) + procID, pipelineCfg.Name, err) } // Check if the factory really created the processor. if tc == nil && mc == nil && lc == nil { - return nil, fmt.Errorf("factory for %v produced a nil processor", procCfg.ID()) + return nil, fmt.Errorf("factory for %v produced a nil processor", procID) } } diff --git a/service/internal/builder/receivers_builder.go b/service/internal/builder/receivers_builder.go index 76653e3ac7ee..715b2b6571dc 100644 --- a/service/internal/builder/receivers_builder.go +++ b/service/internal/builder/receivers_builder.go @@ -80,8 +80,6 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error { // receiversBuilder builds receivers from config. type receiversBuilder struct { - logger *zap.Logger - buildInfo component.BuildInfo config *config.Config builtPipelines BuiltPipelines factories map[config.Type]component.ReceiverFactory @@ -92,19 +90,20 @@ func BuildReceivers( logger *zap.Logger, tracerProvider trace.TracerProvider, buildInfo component.BuildInfo, - config *config.Config, + cfg *config.Config, builtPipelines BuiltPipelines, factories map[config.Type]component.ReceiverFactory, ) (Receivers, error) { - rb := &receiversBuilder{logger.With(zap.String(zapKindKey, zapKindReceiver)), buildInfo, config, builtPipelines, factories} + rb := &receiversBuilder{cfg, builtPipelines, factories} receivers := make(Receivers) - for _, recvCfg := range rb.config.Receivers { + for recvID, recvCfg := range cfg.Receivers { set := component.ReceiverCreateSettings{ - Logger: rb.logger.With(zap.Stringer(zapNameKey, recvCfg.ID())), + Logger: logger.With(zap.String(zapKindKey, zapKindReceiver), zap.String(zapNameKey, recvID.String())), TracerProvider: tracerProvider, BuildInfo: buildInfo, } + rcv, err := rb.buildReceiver(context.Background(), set, recvCfg) if err != nil { if err == errUnusedReceiver { @@ -199,13 +198,10 @@ func attachReceiverToPipelines( if err != nil { if err == componenterror.ErrDataTypeIsNotSupported { return fmt.Errorf( - "receiver %v does not support %s but it was used in a "+ - "%s pipeline", - cfg.ID(), - dataType, - dataType) + "receiver %v does not support %s but it was used in a %s pipeline", + cfg.ID(), dataType, dataType) } - return fmt.Errorf("cannot create receiver %v: %s", cfg.ID(), err.Error()) + return fmt.Errorf("cannot create receiver %v: %w", cfg.ID(), err) } // Check if the factory really created the receiver.