Skip to content

Commit

Permalink
Small cleanups in service/builder (#3693)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jul 22, 2021
1 parent 6814536 commit cfbb0f6
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 60 deletions.
50 changes: 24 additions & 26 deletions service/internal/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,31 +139,38 @@ type dataTypeRequirement struct {
type dataTypeRequirements map[config.DataType]dataTypeRequirement

// Data type requirements for all exporters.
type exportersRequiredDataTypes map[config.Exporter]dataTypeRequirements
type exportersRequiredDataTypes map[config.ComponentID]dataTypeRequirements

// BuildExporters builds Exporters from config.
func BuildExporters(
logger *zap.Logger,
tracerProvider trace.TracerProvider,
buildInfo component.BuildInfo,
config *config.Config,
cfg *config.Config,
factories map[config.Type]component.ExporterFactory,
) (Exporters, error) {
logger = logger.With(zap.String(zapKindKey, zapKindLogExporter))

// We need to calculate required input data types for each exporter so that we know
// which data type must be started for each exporter.
exporterInputDataTypes := calcExportersRequiredDataTypes(config)
exporterInputDataTypes := calcExportersRequiredDataTypes(cfg)

exporters := make(Exporters)
// BuildExporters exporters based on configuration and required input data types.
for _, expCfg := range config.Exporters {

// Build exporters exporters based on configuration and required input data types.
for expID, expCfg := range cfg.Exporters {
set := component.ExporterCreateSettings{
Logger: logger.With(zap.Stringer(zapNameKey, expCfg.ID())),
Logger: logger.With(zap.String(zapNameKey, expID.String())),
TracerProvider: tracerProvider,
BuildInfo: buildInfo,
}
exp, err := buildExporter(context.Background(), factories, set, expCfg, exporterInputDataTypes)

factory, exists := factories[expID.Type()]
if !exists || factory == nil {
return nil, fmt.Errorf("exporter factory not found for type: %s", expID.Type())
}

exp, err := buildExporter(context.Background(), factory, set, expCfg, exporterInputDataTypes[expID])
if err != nil {
return nil, err
}
Expand All @@ -174,7 +181,7 @@ func BuildExporters(
return exporters, nil
}

func calcExportersRequiredDataTypes(config *config.Config) exportersRequiredDataTypes {
func calcExportersRequiredDataTypes(cfg *config.Config) exportersRequiredDataTypes {
// Go over all pipelines. The data type of the pipeline defines what data type
// each exporter is expected to receive. Collect all required types for each
// exporter.
Expand All @@ -187,43 +194,34 @@ func calcExportersRequiredDataTypes(config *config.Config) exportersRequiredData
result := make(exportersRequiredDataTypes)

// Iterate over pipelines.
for _, pipeline := range config.Service.Pipelines {
for _, pipeline := range cfg.Service.Pipelines {
// Iterate over all exporters for this pipeline.
for _, expName := range pipeline.Exporters {
// Find the exporter config by name.
exporter := config.Exporters[expName]

// Create the data type requirement for the exporter if it does not exist.
if result[exporter] == nil {
result[exporter] = make(dataTypeRequirements)
for _, expID := range pipeline.Exporters {
// Create the data type requirement for the expCfg if it does not exist.
if _, ok := result[expID]; !ok {
result[expID] = make(dataTypeRequirements)
}

// Remember that this data type is required for the exporter and also which
// Remember that this data type is required for the expCfg and also which
// pipeline the requirement is coming from.
result[exporter][pipeline.InputType] = dataTypeRequirement{pipeline}
result[expID][pipeline.InputType] = dataTypeRequirement{pipeline}
}
}
return result
}

func buildExporter(
ctx context.Context,
factories map[config.Type]component.ExporterFactory,
factory component.ExporterFactory,
set component.ExporterCreateSettings,
cfg config.Exporter,
exportersInputDataTypes exportersRequiredDataTypes,
inputDataTypes dataTypeRequirements,
) (*builtExporter, error) {
factory := factories[cfg.ID().Type()]
if factory == nil {
return nil, fmt.Errorf("exporter factory not found for type: %s", cfg.ID().Type())
}

exporter := &builtExporter{
logger: set.Logger,
expByDataType: make(map[config.DataType]component.Exporter, 3),
}

inputDataTypes := exportersInputDataTypes[cfg]
if inputDataTypes == nil {
set.Logger.Info("Ignoring exporter as it is not used by any pipeline")
return exporter, nil
Expand Down
2 changes: 1 addition & 1 deletion service/internal/builder/exporters_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestBuildExporters(t *testing.T) {
// Since the endpoint of opencensus exporter doesn't actually exist, e1 may
// already stop because it cannot connect.
// The test should stop running if this isn't the error cause.
require.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
require.EqualError(t, err, "rpc error: code = Canceled desc = grpc: the client connection is closing")
}

// Remove the pipeline so that the exporter is not attached to any pipeline.
Expand Down
27 changes: 14 additions & 13 deletions service/internal/builder/extensions_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,26 @@ func BuildExtensions(
config *config.Config,
factories map[config.Type]component.ExtensionFactory,
) (Extensions, error) {
logger = logger.With(zap.String(zapKindKey, zapKindExtension))
extensions := make(Extensions)
for _, extName := range config.Service.Extensions {
extCfg, exists := config.Extensions[extName]
if !exists {
return nil, fmt.Errorf("extension %q is not configured", extName)
for _, extID := range config.Service.Extensions {
extCfg, existsCfg := config.Extensions[extID]
if !existsCfg {
return nil, fmt.Errorf("extension %q is not configured", extID)
}

factory, existsFactory := factories[extID.Type()]
if !existsFactory {
return nil, fmt.Errorf("extension factory for type %q is not configured", extID.Type())
}

set := component.ExtensionCreateSettings{
Logger: logger.With(zap.Stringer(zapNameKey, extCfg.ID())),
Logger: logger.With(
zap.String(zapKindKey, zapKindExtension),
zap.String(zapNameKey, extID.String())),
TracerProvider: tracerProvider,
BuildInfo: buildInfo,
}
ext, err := buildExtension(context.Background(), factories, set, extCfg)
ext, err := buildExtension(context.Background(), factory, set, extCfg)
if err != nil {
return nil, err
}
Expand All @@ -143,12 +149,7 @@ func BuildExtensions(
return extensions, nil
}

func buildExtension(ctx context.Context, factories map[config.Type]component.ExtensionFactory, creationSet component.ExtensionCreateSettings, cfg config.Extension) (*builtExtension, error) {
factory := factories[cfg.ID().Type()]
if factory == nil {
return nil, fmt.Errorf("extension factory for type %q is not configured", cfg.ID().Type())
}

func buildExtension(ctx context.Context, factory component.ExtensionFactory, creationSet component.ExtensionCreateSettings, cfg config.Extension) (*builtExtension, error) {
ext := &builtExtension{
logger: creationSet.Logger,
}
Expand Down
2 changes: 1 addition & 1 deletion service/internal/builder/extensions_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestService_setupExtensions(t *testing.T) {
ext, err := BuildExtensions(zap.NewNop(), trace.NewNoopTracerProvider(), component.DefaultBuildInfo(), tt.config, tt.factories.Extensions)

assert.Error(t, err)
assert.Equal(t, tt.wantErrMsg, err.Error())
assert.EqualError(t, err, tt.wantErrMsg)
assert.Equal(t, 0, len(ext))
})
}
Expand Down
21 changes: 14 additions & 7 deletions service/internal/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,24 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
// the processor itself becomes a consumer for the one that precedes it in
// in the pipeline and so on.
for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- {
procName := pipelineCfg.Processors[i]
procCfg := pb.config.Processors[procName]
procID := pipelineCfg.Processors[i]

factory := pb.factories[procCfg.ID().Type()]
procCfg, existsCfg := pb.config.Processors[procID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", procID)
}

factory, existsFactory := pb.factories[procID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory for type %q is not configured", procID.Type())
}

// This processor must point to the next consumer and then
// it becomes the next for the previous one (previous in the pipeline,
// which we will build in the next loop iteration).
var err error
set := component.ProcessorCreateSettings{
Logger: pb.logger.With(zap.String(zapKindKey, zapKindProcessor), zap.Stringer(zapNameKey, procCfg.ID())),
Logger: pb.logger.With(zap.String(zapKindKey, zapKindProcessor), zap.String(zapNameKey, procID.String())),
TracerProvider: pb.tracerProvider,
BuildInfo: pb.buildInfo,
}
Expand Down Expand Up @@ -188,17 +195,17 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf

default:
return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %s is not supported",
procName, pipelineCfg.Name, pipelineCfg.InputType)
procID, pipelineCfg.Name, pipelineCfg.InputType)
}

if err != nil {
return nil, fmt.Errorf("error creating processor %q in pipeline %q: %v",
procName, pipelineCfg.Name, err)
procID, pipelineCfg.Name, err)
}

// Check if the factory really created the processor.
if tc == nil && mc == nil && lc == nil {
return nil, fmt.Errorf("factory for %v produced a nil processor", procCfg.ID())
return nil, fmt.Errorf("factory for %v produced a nil processor", procID)
}
}

Expand Down
20 changes: 8 additions & 12 deletions service/internal/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error {

// receiversBuilder builds receivers from config.
type receiversBuilder struct {
logger *zap.Logger
buildInfo component.BuildInfo
config *config.Config
builtPipelines BuiltPipelines
factories map[config.Type]component.ReceiverFactory
Expand All @@ -92,19 +90,20 @@ func BuildReceivers(
logger *zap.Logger,
tracerProvider trace.TracerProvider,
buildInfo component.BuildInfo,
config *config.Config,
cfg *config.Config,
builtPipelines BuiltPipelines,
factories map[config.Type]component.ReceiverFactory,
) (Receivers, error) {
rb := &receiversBuilder{logger.With(zap.String(zapKindKey, zapKindReceiver)), buildInfo, config, builtPipelines, factories}
rb := &receiversBuilder{cfg, builtPipelines, factories}

receivers := make(Receivers)
for _, recvCfg := range rb.config.Receivers {
for recvID, recvCfg := range cfg.Receivers {
set := component.ReceiverCreateSettings{
Logger: rb.logger.With(zap.Stringer(zapNameKey, recvCfg.ID())),
Logger: logger.With(zap.String(zapKindKey, zapKindReceiver), zap.String(zapNameKey, recvID.String())),
TracerProvider: tracerProvider,
BuildInfo: buildInfo,
}

rcv, err := rb.buildReceiver(context.Background(), set, recvCfg)
if err != nil {
if err == errUnusedReceiver {
Expand Down Expand Up @@ -199,13 +198,10 @@ func attachReceiverToPipelines(
if err != nil {
if err == componenterror.ErrDataTypeIsNotSupported {
return fmt.Errorf(
"receiver %v does not support %s but it was used in a "+
"%s pipeline",
cfg.ID(),
dataType,
dataType)
"receiver %v does not support %s but it was used in a %s pipeline",
cfg.ID(), dataType, dataType)
}
return fmt.Errorf("cannot create receiver %v: %s", cfg.ID(), err.Error())
return fmt.Errorf("cannot create receiver %v: %w", cfg.ID(), err)
}

// Check if the factory really created the receiver.
Expand Down

0 comments on commit cfbb0f6

Please sign in to comment.