Skip to content

Commit

Permalink
Adapt ottl configuration to same BoolExpr, remove duplicate code (#16446
Browse files Browse the repository at this point in the history
)

Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 23, 2022
1 parent 9e27717 commit 906f411
Show file tree
Hide file tree
Showing 17 changed files with 290 additions and 408 deletions.
11 changes: 11 additions & 0 deletions .chloggen/ottelfilternext.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filterprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adapt ottl configuration to same BoolExpr, remove duplicate code

# One or more tracking issues related to the change
issues: [16446]
9 changes: 8 additions & 1 deletion internal/filter/expr/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,12 @@ func (om orMatcher[K]) Eval(ctx context.Context, tCtx K) (bool, error) {
}

func Or[K any](matchers ...BoolExpr[K]) BoolExpr[K] {
return orMatcher[K]{matchers: matchers}
switch len(matchers) {
case 0:
return nil
case 1:
return matchers[0]
default:
return orMatcher[K]{matchers: matchers}
}
}
14 changes: 8 additions & 6 deletions processor/attributesprocessor/attributes_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ func (a *logAttributesProcessor) processLogs(ctx context.Context, ld plog.Logs)
library := ils.Scope()
for k := 0; k < logs.Len(); k++ {
lr := logs.At(k)
skip, err := a.skipExpr.Eval(ctx, ottllog.NewTransformContext(lr, library, resource))
if err != nil {
return ld, err
}
if skip {
continue
if a.skipExpr != nil {
skip, err := a.skipExpr.Eval(ctx, ottllog.NewTransformContext(lr, library, resource))
if err != nil {
return ld, err
}
if skip {
continue
}
}

a.attrProc.Process(ctx, a.logger, lr.Attributes())
Expand Down
15 changes: 8 additions & 7 deletions processor/attributesprocessor/attributes_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ func (a *metricAttributesProcessor) processMetrics(ctx context.Context, md pmetr
metrics := ils.Metrics()
for k := 0; k < metrics.Len(); k++ {
m := metrics.At(k)
skip, err := a.skipExpr.Eval(ctx, ottlmetric.NewTransformContext(m, scope, resource))
if err != nil {
return md, err
if a.skipExpr != nil {
skip, err := a.skipExpr.Eval(ctx, ottlmetric.NewTransformContext(m, scope, resource))
if err != nil {
return md, err
}
if skip {
continue
}
}
if skip {
continue
}

a.processMetricAttributes(ctx, m)
}
}
Expand Down
16 changes: 8 additions & 8 deletions processor/attributesprocessor/attributes_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func (a *spanAttributesProcessor) processTraces(ctx context.Context, td ptrace.T
scope := ils.Scope()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)

skip, err := a.skipExpr.Eval(ctx, ottlspan.NewTransformContext(span, scope, resource))
if err != nil {
return td, err
}
if skip {
continue
if a.skipExpr != nil {
skip, err := a.skipExpr.Eval(ctx, ottlspan.NewTransformContext(span, scope, resource))
if err != nil {
return td, err
}
if skip {
continue
}
}

a.attrProc.Process(ctx, a.logger, span.Attributes())
}
}
Expand Down
21 changes: 5 additions & 16 deletions processor/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,11 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filtermetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset/regexp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/common"
)

Expand Down Expand Up @@ -291,32 +285,27 @@ func (cfg *Config) Validate() error {
var errors error

if cfg.Traces.SpanConditions != nil {
spanp := ottlspan.NewParser(common.Functions[ottlspan.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := spanp.ParseStatements(common.PrepareConditionForParsing(cfg.Traces.SpanConditions))
_, err := common.ParseSpan(cfg.Traces.SpanConditions, component.TelemetrySettings{})
errors = multierr.Append(errors, err)
}

if cfg.Traces.SpanEventConditions != nil {
spaneventp := ottlspanevent.NewParser(common.Functions[ottlspanevent.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := spaneventp.ParseStatements(common.PrepareConditionForParsing(cfg.Traces.SpanEventConditions))
_, err := common.ParseSpanEvent(cfg.Traces.SpanEventConditions, component.TelemetrySettings{})
errors = multierr.Append(errors, err)
}

if cfg.Metrics.MetricConditions != nil {
metricp := ottlmetric.NewParser(common.Functions[ottlmetric.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := metricp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.MetricConditions))
_, err := common.ParseMetric(cfg.Metrics.MetricConditions, component.TelemetrySettings{})
errors = multierr.Append(errors, err)
}

if cfg.Metrics.DataPointConditions != nil {
datapointp := ottldatapoint.NewParser(common.Functions[ottldatapoint.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := datapointp.ParseStatements(common.PrepareConditionForParsing(cfg.Metrics.DataPointConditions))
_, err := common.ParseDataPoint(cfg.Metrics.DataPointConditions, component.TelemetrySettings{})
errors = multierr.Append(errors, err)
}

if cfg.Logs.LogConditions != nil {
logp := ottllog.NewParser(common.Functions[ottllog.TransformContext](), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := logp.ParseStatements(common.PrepareConditionForParsing(cfg.Logs.LogConditions))
_, err := common.ParseLog(cfg.Logs.LogConditions, component.TelemetrySettings{})
errors = multierr.Append(errors, err)
}

Expand Down
6 changes: 3 additions & 3 deletions processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func createMetricsProcessor(
cfg component.ProcessorConfig,
nextConsumer consumer.Metrics,
) (component.MetricsProcessor, error) {
fp, err := newFilterMetricProcessor(set.Logger, cfg.(*Config))
fp, err := newFilterMetricProcessor(set.TelemetrySettings, cfg.(*Config))
if err != nil {
return nil, err
}
Expand All @@ -74,7 +74,7 @@ func createLogsProcessor(
cfg component.ProcessorConfig,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
fp, err := newFilterLogsProcessor(set.Logger, cfg.(*Config))
fp, err := newFilterLogsProcessor(set.TelemetrySettings, cfg.(*Config))
if err != nil {
return nil, err
}
Expand All @@ -93,7 +93,7 @@ func createTracesProcessor(
cfg component.ProcessorConfig,
nextConsumer consumer.Traces,
) (component.TracesProcessor, error) {
fp, err := newFilterSpansProcessor(set.Logger, cfg.(*Config))
fp, err := newFilterSpansProcessor(set.TelemetrySettings, cfg.(*Config))
if err != nil {
return nil, err
}
Expand Down
39 changes: 0 additions & 39 deletions processor/filterprocessor/internal/common/functions.go

This file was deleted.

34 changes: 0 additions & 34 deletions processor/filterprocessor/internal/common/matcher.go

This file was deleted.

107 changes: 101 additions & 6 deletions processor/filterprocessor/internal/common/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,109 @@

package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/common"

import "fmt"
import (
"context"

const functionWithCondition = "drop() where %v"
"go.opentelemetry.io/collector/component"

func PrepareConditionForParsing(conditions []string) []string {
validStatements := make([]string, len(conditions))
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs"
)

func ParseSpan(conditions []string, set component.TelemetrySettings) (expr.BoolExpr[ottlspan.TransformContext], error) {
statmentsStr := conditionsToStatements(conditions)
parser := ottlspan.NewParser(functions[ottlspan.TransformContext](), set)
statements, err := parser.ParseStatements(statmentsStr)
if err != nil {
return nil, err
}
return statementsToExpr(statements), nil
}

func ParseSpanEvent(conditions []string, set component.TelemetrySettings) (expr.BoolExpr[ottlspanevent.TransformContext], error) {
statmentsStr := conditionsToStatements(conditions)
parser := ottlspanevent.NewParser(functions[ottlspanevent.TransformContext](), set)
statements, err := parser.ParseStatements(statmentsStr)
if err != nil {
return nil, err
}
return statementsToExpr(statements), nil
}

func ParseLog(conditions []string, set component.TelemetrySettings) (expr.BoolExpr[ottllog.TransformContext], error) {
statmentsStr := conditionsToStatements(conditions)
parser := ottllog.NewParser(functions[ottllog.TransformContext](), set)
statements, err := parser.ParseStatements(statmentsStr)
if err != nil {
return nil, err
}
return statementsToExpr(statements), nil
}

func ParseMetric(conditions []string, set component.TelemetrySettings) (expr.BoolExpr[ottlmetric.TransformContext], error) {
statmentsStr := conditionsToStatements(conditions)
parser := ottlmetric.NewParser(functions[ottlmetric.TransformContext](), set)
statements, err := parser.ParseStatements(statmentsStr)
if err != nil {
return nil, err
}
return statementsToExpr(statements), nil
}

func ParseDataPoint(conditions []string, set component.TelemetrySettings) (expr.BoolExpr[ottldatapoint.TransformContext], error) {
statmentsStr := conditionsToStatements(conditions)
parser := ottldatapoint.NewParser(functions[ottldatapoint.TransformContext](), set)
statements, err := parser.ParseStatements(statmentsStr)
if err != nil {
return nil, err
}
return statementsToExpr(statements), nil
}

func conditionsToStatements(conditions []string) []string {
statements := make([]string, len(conditions))
for i, condition := range conditions {
validStatements[i] = fmt.Sprintf(functionWithCondition, condition)
statements[i] = "drop() where " + condition
}
return statements
}

type statementExpr[K any] struct {
statement *ottl.Statement[K]
}

func (se statementExpr[K]) Eval(ctx context.Context, tCtx K) (bool, error) {
_, ret, err := se.statement.Execute(ctx, tCtx)
return ret, err
}

func statementsToExpr[K any](statements []*ottl.Statement[K]) expr.BoolExpr[K] {
var rets []expr.BoolExpr[K]
for _, statement := range statements {
rets = append(rets, statementExpr[K]{statement: statement})
}
return expr.Or(rets...)
}

func functions[K any]() map[string]interface{} {
return map[string]interface{}{
"TraceID": ottlfuncs.TraceID[K],
"SpanID": ottlfuncs.SpanID[K],
"IsMatch": ottlfuncs.IsMatch[K],
"Concat": ottlfuncs.Concat[K],
"Split": ottlfuncs.Split[K],
"Int": ottlfuncs.Int[K],
"ConvertCase": ottlfuncs.ConvertCase[K],
"drop": func() (ottl.ExprFunc[K], error) {
return func(context.Context, K) (interface{}, error) {
return true, nil
}, nil
},
}
return validStatements
}
Loading

0 comments on commit 906f411

Please sign in to comment.