Skip to content

Commit

Permalink
[processor/transform] Allow specifying functions per context (open-te…
Browse files Browse the repository at this point in the history
…lemetry#16251)

* Allow specifying functions per context
  • Loading branch information
TylerHelmuth committed Nov 15, 2022
1 parent ce37a30 commit f031b3b
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 32 deletions.
16 changes: 16 additions & 0 deletions .chloggen/tp-functions-per-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transform

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where the metric context was using datapoint functions.

# One or more tracking issues related to the change
issues: [16251]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
14 changes: 7 additions & 7 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ func (c *Config) Validate() error {
}

if len(c.Traces.Statements) > 0 {
ottlspanp := ottlspan.NewParser(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
ottlspanp := ottlspan.NewParser(traces.SpanFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottlspanp.ParseStatements(c.Traces.Statements)
if err != nil {
return err
}
}

if len(c.TraceStatements) > 0 {
pc, err := common.NewTraceParserCollection(traces.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
pc, err := common.NewTraceParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithSpanParser(traces.SpanFunctions()), common.WithSpanEventParser(traces.SpanEventFunctions()))
if err != nil {
return err
}
Expand All @@ -81,15 +81,15 @@ func (c *Config) Validate() error {
}

if len(c.Metrics.Statements) > 0 {
ottldatapointp := ottldatapoint.NewParser(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottldatapointp.ParseStatements(c.Metrics.Statements)
ottlmetricsp := ottldatapoint.NewParser(metrics.DataPointFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottlmetricsp.ParseStatements(c.Metrics.Statements)
if err != nil {
return err
}
}

if len(c.MetricStatements) > 0 {
pc, err := common.NewMetricParserCollection(metrics.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
pc, err := common.NewMetricParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithMetricParser(metrics.MetricFunctions()), common.WithDataPointParser(metrics.DataPointFunctions()))
if err != nil {
return err
}
Expand All @@ -102,15 +102,15 @@ func (c *Config) Validate() error {
}

if len(c.Logs.Statements) > 0 {
ottllogsp := ottllog.NewParser(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
ottllogsp := ottllog.NewParser(logs.LogFunctions(), component.TelemetrySettings{Logger: zap.NewNop()})
_, err := ottllogsp.ParseStatements(c.Logs.Statements)
if err != nil {
return err
}
}

if len(c.LogStatements) > 0 {
pc, err := common.NewLogParserCollection(logs.Functions(), component.TelemetrySettings{Logger: zap.NewNop()})
pc, err := common.NewLogParserCollection(component.TelemetrySettings{Logger: zap.NewNop()}, common.WithLogParser(logs.LogFunctions()))
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,20 @@ type LogParserCollection struct {

type LogParserCollectionOption func(*LogParserCollection) error

func NewLogParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
func WithLogParser(functions map[string]interface{}) LogParserCollectionOption {
return func(lp *LogParserCollection) error {
lp.logParser = ottllog.NewParser(functions, lp.settings)
return nil
}
}

func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
lpc := &LogParserCollection{
parserCollection: parserCollection{
settings: settings,
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings),
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings),
},
logParser: ottllog.NewParser(functions, settings),
}

for _, op := range options {
Expand Down
18 changes: 15 additions & 3 deletions processor/transformprocessor/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,27 @@ type MetricParserCollection struct {

type MetricParserCollectionOption func(*MetricParserCollection) error

func NewMetricParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) {
func WithMetricParser(functions map[string]interface{}) MetricParserCollectionOption {
return func(mp *MetricParserCollection) error {
mp.metricParser = ottlmetric.NewParser(functions, mp.settings)
return nil
}
}

func WithDataPointParser(functions map[string]interface{}) MetricParserCollectionOption {
return func(mp *MetricParserCollection) error {
mp.dataPointParser = ottldatapoint.NewParser(functions, mp.settings)
return nil
}
}

func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) {
mpc := &MetricParserCollection{
parserCollection: parserCollection{
settings: settings,
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings),
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings),
},
metricParser: ottlmetric.NewParser(functions, settings),
dataPointParser: ottldatapoint.NewParser(functions, settings),
}

for _, op := range options {
Expand Down
17 changes: 15 additions & 2 deletions processor/transformprocessor/internal/common/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,27 @@ type TraceParserCollection struct {

type TraceParserCollectionOption func(*TraceParserCollection) error

func NewTraceParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
func WithSpanParser(functions map[string]interface{}) TraceParserCollectionOption {
return func(tp *TraceParserCollection) error {
tp.spanParser = ottlspan.NewParser(functions, tp.settings)
return nil
}
}

func WithSpanEventParser(functions map[string]interface{}) TraceParserCollectionOption {
return func(tp *TraceParserCollection) error {
tp.spanEventParser = ottlspanevent.NewParser(functions, tp.settings)
return nil
}
}

func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
tpc := &TraceParserCollection{
parserCollection: parserCollection{
settings: settings,
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings),
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings),
},
spanParser: ottlspan.NewParser(functions, settings),
}

for _, op := range options {
Expand Down
2 changes: 1 addition & 1 deletion processor/transformprocessor/internal/logs/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Functions() map[string]interface{} {
func LogFunctions() map[string]interface{} {
// No logs-only functions yet.
return common.Functions[ottllog.TransformContext]()
}
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/logs/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Test_DefaultFunctions(t *testing.T) {
func Test_LogFunctions(t *testing.T) {
expected := common.Functions[ottllog.TransformContext]()
actual := Functions()
actual := LogFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
Expand Down
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/logs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Processor struct {

func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottllog.NewParser(Functions(), settings)
ottlp := ottllog.NewParser(LogFunctions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
Expand All @@ -44,7 +44,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme
}, nil
}

pc, err := common.NewLogParserCollection(Functions(), settings)
pc, err := common.NewLogParserCollection(settings, common.WithLogParser(LogFunctions()))
if err != nil {
return nil, err
}
Expand Down
13 changes: 9 additions & 4 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

// registry is a map of names to functions for metrics pipelines
var registry = map[string]interface{}{
var datapointRegistry = map[string]interface{}{
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
"convert_summary_sum_val_to_sum": convertSummarySumValToSum,
Expand All @@ -30,10 +31,14 @@ var registry = map[string]interface{}{
func init() {
// Init metrics registry with default functions common to all signals
for k, v := range common.Functions[ottldatapoint.TransformContext]() {
registry[k] = v
datapointRegistry[k] = v
}
}

func Functions() map[string]interface{} {
return registry
func DataPointFunctions() map[string]interface{} {
return datapointRegistry
}

func MetricFunctions() map[string]interface{} {
return common.Functions[ottlmetric.TransformContext]()
}
14 changes: 12 additions & 2 deletions processor/transformprocessor/internal/metrics/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,30 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Test_DefaultFunctions(t *testing.T) {
func Test_DataPointFunctions(t *testing.T) {
expected := common.Functions[ottldatapoint.TransformContext]()
expected["convert_sum_to_gauge"] = convertSumToGauge
expected["convert_gauge_to_sum"] = convertGaugeToSum
expected["convert_summary_sum_val_to_sum"] = convertSummarySumValToSum
expected["convert_summary_count_val_to_sum"] = convertSummaryCountValToSum

actual := Functions()
actual := DataPointFunctions()

require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
}
}

func Test_MetricFunctions(t *testing.T) {
expected := common.Functions[ottlmetric.TransformContext]()
actual := MetricFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
}
}
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/metrics/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Processor struct {

func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottldatapoint.NewParser(Functions(), settings)
ottlp := ottldatapoint.NewParser(DataPointFunctions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
Expand All @@ -45,7 +45,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme
}, nil
}

pc, err := common.NewMetricParserCollection(Functions(), settings)
pc, err := common.NewMetricParserCollection(settings, common.WithMetricParser(MetricFunctions()), common.WithDataPointParser(DataPointFunctions()))
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion processor/transformprocessor/internal/traces/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Functions() map[string]interface{} {
func SpanFunctions() map[string]interface{} {
// No trace-only functions yet.
return common.Functions[ottlspan.TransformContext]()
}

func SpanEventFunctions() map[string]interface{} {
// No trace-only functions yet.
return common.Functions[ottlspanevent.TransformContext]()
}
14 changes: 12 additions & 2 deletions processor/transformprocessor/internal/traces/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,22 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

func Test_DefaultFunctions(t *testing.T) {
func Test_SpanFunctions(t *testing.T) {
expected := common.Functions[ottlspan.TransformContext]()
actual := Functions()
actual := SpanFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
}
}

func Test_SpanEventFunctions(t *testing.T) {
expected := common.Functions[ottlspanevent.TransformContext]()
actual := SpanEventFunctions()
require.Equal(t, len(expected), len(actual))
for k := range actual {
assert.Contains(t, expected, k)
Expand Down
4 changes: 2 additions & 2 deletions processor/transformprocessor/internal/traces/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Processor struct {

func NewProcessor(statements []string, contextStatements []common.ContextStatements, settings component.TelemetrySettings) (*Processor, error) {
if len(statements) > 0 {
ottlp := ottlspan.NewParser(Functions(), settings)
ottlp := ottlspan.NewParser(SpanFunctions(), settings)
parsedStatements, err := ottlp.ParseStatements(statements)
if err != nil {
return nil, err
Expand All @@ -44,7 +44,7 @@ func NewProcessor(statements []string, contextStatements []common.ContextStateme
}, nil
}

pc, err := common.NewTraceParserCollection(Functions(), settings)
pc, err := common.NewTraceParserCollection(settings, common.WithSpanParser(SpanFunctions()), common.WithSpanEventParser(SpanEventFunctions()))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f031b3b

Please sign in to comment.