diff --git a/.chloggen/tp-functions-per-context.yaml b/.chloggen/tp-functions-per-context.yaml new file mode 100755 index 0000000000000..1b815aa0fbf62 --- /dev/null +++ b/.chloggen/tp-functions-per-context.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where the metric context was using datapoint functions. + +# One or more tracking issues related to the change +issues: [16251] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 3dad6667752c5..48a9925bdc4ce 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -60,7 +60,7 @@ func (c *Config) Validate() error { } if len(c.Traces.Statements) > 0 { - ottlspanp := ottlspan.NewParser(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + ottlspanp := ottlspan.NewParser(traces.SpanFunctions(), component.TelemetrySettings{Logger: zap.NewNop()}) _, err := ottlspanp.ParseStatements(c.Traces.Statements) if err != nil { return err @@ -68,7 +68,7 @@ func (c *Config) Validate() error { } if len(c.TraceStatements) > 0 { - pc, err := common.NewTraceParserCollection(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + pc, err := common.NewTraceParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithSpanParser(traces.SpanFunctions()), common.WithSpanEventParser(traces.SpanEventFunctions())) if err != nil { return err } @@ -81,15 +81,15 @@ func (c *Config) Validate() error { } if len(c.Metrics.Statements) > 0 { - ottldatapointp := ottldatapoint.NewParser(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) - _, err := ottldatapointp.ParseStatements(c.Metrics.Statements) + ottlmetricsp := ottldatapoint.NewParser(metrics.DataPointFunctions(), component.TelemetrySettings{Logger: zap.NewNop()}) + _, err := ottlmetricsp.ParseStatements(c.Metrics.Statements) if err != nil { return err } } if len(c.MetricStatements) > 0 { - pc, err := common.NewMetricParserCollection(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + pc, err := common.NewMetricParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithMetricParser(metrics.MetricFunctions()), common.WithDataPointParser(metrics.DataPointFunctions())) if err != nil { return err } @@ -102,7 +102,7 @@ func (c *Config) Validate() error { } if len(c.Logs.Statements) > 0 { - ottllogsp := ottllog.NewParser(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + ottllogsp := ottllog.NewParser(logs.LogFunctions(), component.TelemetrySettings{Logger: zap.NewNop()}) _, err := ottllogsp.ParseStatements(c.Logs.Statements) if err != nil { return err @@ -110,7 +110,7 @@ func (c *Config) Validate() error { } if len(c.LogStatements) > 0 { - pc, err := common.NewLogParserCollection(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()}) + pc, err := common.NewLogParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithLogParser(logs.LogFunctions())) if err != nil { return err } diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index fcd4e96e8958b..8267b2dcbe4d5 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -64,14 +64,20 @@ type LogParserCollection struct { type LogParserCollectionOption func(*LogParserCollection) error -func NewLogParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { +func WithLogParser(functions map[string]interface{}) LogParserCollectionOption { + return func(lp *LogParserCollection) error { + lp.logParser = ottllog.NewParser(functions, lp.settings) + return nil + } +} + +func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { lpc := &LogParserCollection{ parserCollection: parserCollection{ settings: settings, resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), }, - logParser: ottllog.NewParser(functions, settings), } for _, op := range options { diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index df6053cb5053a..097818dffb510 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -161,15 +161,27 @@ type MetricParserCollection struct { type MetricParserCollectionOption func(*MetricParserCollection) error -func NewMetricParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { +func WithMetricParser(functions map[string]interface{}) MetricParserCollectionOption { + return func(mp *MetricParserCollection) error { + mp.metricParser = ottlmetric.NewParser(functions, mp.settings) + return nil + } +} + +func WithDataPointParser(functions map[string]interface{}) MetricParserCollectionOption { + return func(mp *MetricParserCollection) error { + mp.dataPointParser = ottldatapoint.NewParser(functions, mp.settings) + return nil + } +} + +func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { mpc := &MetricParserCollection{ parserCollection: parserCollection{ settings: settings, resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), }, - metricParser: ottlmetric.NewParser(functions, settings), - dataPointParser: ottldatapoint.NewParser(functions, settings), } for _, op := range options { diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index 89c8f57f520cd..9a5c4b7fb9272 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -100,14 +100,27 @@ type TraceParserCollection struct { type TraceParserCollectionOption func(*TraceParserCollection) error -func NewTraceParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { +func WithSpanParser(functions map[string]interface{}) TraceParserCollectionOption { + return func(tp *TraceParserCollection) error { + tp.spanParser = ottlspan.NewParser(functions, tp.settings) + return nil + } +} + +func WithSpanEventParser(functions map[string]interface{}) TraceParserCollectionOption { + return func(tp *TraceParserCollection) error { + tp.spanEventParser = ottlspanevent.NewParser(functions, tp.settings) + return nil + } +} + +func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { tpc := &TraceParserCollection{ parserCollection: parserCollection{ settings: settings, resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), }, - spanParser: ottlspan.NewParser(functions, settings), } for _, op := range options { diff --git a/processor/transformprocessor/internal/logs/functions.go b/processor/transformprocessor/internal/logs/functions.go index 1f7ba07822549..6378c293b3d9d 100644 --- a/processor/transformprocessor/internal/logs/functions.go +++ b/processor/transformprocessor/internal/logs/functions.go @@ -19,7 +19,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) -func Functions() map[string]interface{} { +func LogFunctions() map[string]interface{} { // No logs-only functions yet. return common.Functions[ottllog.TransformContext]() } diff --git a/processor/transformprocessor/internal/logs/functions_test.go b/processor/transformprocessor/internal/logs/functions_test.go index 8190b902879a0..c81e886605866 100644 --- a/processor/transformprocessor/internal/logs/functions_test.go +++ b/processor/transformprocessor/internal/logs/functions_test.go @@ -24,9 +24,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) -func Test_DefaultFunctions(t *testing.T) { +func Test_LogFunctions(t *testing.T) { expected := common.Functions[ottllog.TransformContext]() - actual := Functions() + actual := LogFunctions() require.Equal(t, len(expected), len(actual)) for k := range actual { assert.Contains(t, expected, k) diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index a6c16b17330c6..f89939c8ca263 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -34,7 +34,7 @@ type Processor struct { func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { if len(statements) > 0 { - ottlp := ottllog.NewParser(Functions(), settings) + ottlp := ottllog.NewParser(LogFunctions(), settings) parsedStatements, err := ottlp.ParseStatements(statements) if err != nil { return nil, err @@ -44,7 +44,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme }, nil } - pc, err := common.NewLogParserCollection(Functions(), settings) + pc, err := common.NewLogParserCollection(settings, common.WithLogParser(LogFunctions())) if err != nil { return nil, err } diff --git a/processor/transformprocessor/internal/metrics/functions.go b/processor/transformprocessor/internal/metrics/functions.go index 99ff965505cce..e9459380ef310 100644 --- a/processor/transformprocessor/internal/metrics/functions.go +++ b/processor/transformprocessor/internal/metrics/functions.go @@ -16,11 +16,12 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) // registry is a map of names to functions for metrics pipelines -var registry = map[string]interface{}{ +var datapointRegistry = map[string]interface{}{ "convert_sum_to_gauge": convertSumToGauge, "convert_gauge_to_sum": convertGaugeToSum, "convert_summary_sum_val_to_sum": convertSummarySumValToSum, @@ -30,10 +31,14 @@ var registry = map[string]interface{}{ func init() { // Init metrics registry with default functions common to all signals for k, v := range common.Functions[ottldatapoint.TransformContext]() { - registry[k] = v + datapointRegistry[k] = v } } -func Functions() map[string]interface{} { - return registry +func DataPointFunctions() map[string]interface{} { + return datapointRegistry +} + +func MetricFunctions() map[string]interface{} { + return common.Functions[ottlmetric.TransformContext]() } diff --git a/processor/transformprocessor/internal/metrics/functions_test.go b/processor/transformprocessor/internal/metrics/functions_test.go index 5dc007fd0eb52..5ded6a8b37a51 100644 --- a/processor/transformprocessor/internal/metrics/functions_test.go +++ b/processor/transformprocessor/internal/metrics/functions_test.go @@ -21,20 +21,30 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) -func Test_DefaultFunctions(t *testing.T) { +func Test_DataPointFunctions(t *testing.T) { expected := common.Functions[ottldatapoint.TransformContext]() expected["convert_sum_to_gauge"] = convertSumToGauge expected["convert_gauge_to_sum"] = convertGaugeToSum expected["convert_summary_sum_val_to_sum"] = convertSummarySumValToSum expected["convert_summary_count_val_to_sum"] = convertSummaryCountValToSum - actual := Functions() + actual := DataPointFunctions() require.Equal(t, len(expected), len(actual)) for k := range actual { assert.Contains(t, expected, k) } } + +func Test_MetricFunctions(t *testing.T) { + expected := common.Functions[ottlmetric.TransformContext]() + actual := MetricFunctions() + require.Equal(t, len(expected), len(actual)) + for k := range actual { + assert.Contains(t, expected, k) + } +} diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index 4e4e6e7e17a45..f84bdbfaca19a 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -35,7 +35,7 @@ type Processor struct { func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { if len(statements) > 0 { - ottlp := ottldatapoint.NewParser(Functions(), settings) + ottlp := ottldatapoint.NewParser(DataPointFunctions(), settings) parsedStatements, err := ottlp.ParseStatements(statements) if err != nil { return nil, err @@ -45,7 +45,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme }, nil } - pc, err := common.NewMetricParserCollection(Functions(), settings) + pc, err := common.NewMetricParserCollection(settings, common.WithMetricParser(MetricFunctions()), common.WithDataPointParser(DataPointFunctions())) if err != nil { return nil, err } diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index 1d6fe68dc50b9..31c2b9f57b3c9 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -16,10 +16,16 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) -func Functions() map[string]interface{} { +func SpanFunctions() map[string]interface{} { // No trace-only functions yet. return common.Functions[ottlspan.TransformContext]() } + +func SpanEventFunctions() map[string]interface{} { + // No trace-only functions yet. + return common.Functions[ottlspanevent.TransformContext]() +} diff --git a/processor/transformprocessor/internal/traces/functions_test.go b/processor/transformprocessor/internal/traces/functions_test.go index 715184d95c012..207fd550984ae 100644 --- a/processor/transformprocessor/internal/traces/functions_test.go +++ b/processor/transformprocessor/internal/traces/functions_test.go @@ -21,12 +21,22 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) -func Test_DefaultFunctions(t *testing.T) { +func Test_SpanFunctions(t *testing.T) { expected := common.Functions[ottlspan.TransformContext]() - actual := Functions() + actual := SpanFunctions() + require.Equal(t, len(expected), len(actual)) + for k := range actual { + assert.Contains(t, expected, k) + } +} + +func Test_SpanEventFunctions(t *testing.T) { + expected := common.Functions[ottlspanevent.TransformContext]() + actual := SpanEventFunctions() require.Equal(t, len(expected), len(actual)) for k := range actual { assert.Contains(t, expected, k) diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index efae98c2eae82..525ae4f20a597 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -34,7 +34,7 @@ type Processor struct { func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) { if len(statements) > 0 { - ottlp := ottlspan.NewParser(Functions(), settings) + ottlp := ottlspan.NewParser(SpanFunctions(), settings) parsedStatements, err := ottlp.ParseStatements(statements) if err != nil { return nil, err @@ -44,7 +44,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme }, nil } - pc, err := common.NewTraceParserCollection(Functions(), settings) + pc, err := common.NewTraceParserCollection(settings, common.WithSpanParser(SpanFunctions()), common.WithSpanEventParser(SpanEventFunctions())) if err != nil { return nil, err }