Skip to content

Commit

Permalink
[processor/transform] Add 'transform.flatten.logs' feature gate (open…
Browse files Browse the repository at this point in the history
…-telemetry#33338)

This PR proposes a feature gate which would enable the Flatten/Unflatten
behavior described
[here](open-telemetry#32080 (comment)).
This was discussed in the Collector SIG recently and it was mentioned
that a feature gate would be a reasonable way to implement this.

One immediate question: Should this be purely a feature gate, or should
there be a config option on the processor which fails `Validate` if the
feature gate is not set?

---------

Co-authored-by: Curtis Robert <[email protected]>
Co-authored-by: Evan Bradley <[email protected]>
Co-authored-by: Tyler Helmuth <[email protected]>
  • Loading branch information
4 people committed Jun 13, 2024
1 parent 1b172e7 commit b03a61a
Show file tree
Hide file tree
Showing 17 changed files with 320 additions and 7 deletions.
28 changes: 28 additions & 0 deletions .chloggen/transform-flatten-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transform

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `transform.flatten.logs` featuregate to give each log record a distinct resource and scope.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32080]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This option is useful when applying transformations which alter the resource or scope. e.g. `set(resource.attributes["to"], attributes["from"])`, which may otherwise result in unexpected behavior. Using this option typically incurs a performance penalty as the processor must compute many hashes and create copies of resource and scope information for every log record.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,4 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider => ../../confmap/provider/s3provider
- github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.102.0 // indirect
Expand Down Expand Up @@ -1297,3 +1298,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provid
replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil
2 changes: 2 additions & 0 deletions connector/datadogconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,5 @@ replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil
3 changes: 3 additions & 0 deletions exporter/datadogexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.102.0 // indirect
Expand Down Expand Up @@ -430,3 +431,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/stor
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil
2 changes: 2 additions & 0 deletions exporter/datadogexporter/integrationtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prome
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../../processor/transformprocessor

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../../pkg/sampling

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../../internal/pdatautil
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.102.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.102.0 // indirect
Expand Down Expand Up @@ -1237,3 +1238,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/acke
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver => ./receiver/splunkenterprisereceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ./pkg/sampling

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ./internal/pdatautil
25 changes: 25 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,28 @@ The transform processor uses the [OpenTelemetry Transformation Language](https:/
- Although the OTTL allows the `set` function to be used with `metric.data_type`, its implementation in the transform processor is NOOP. To modify a data type you must use a function specific to that purpose.
- [Identity Conflict](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#identity-conflict): Transformation of metrics have the potential to affect the identity of a metric leading to an Identity Crisis. Be especially cautious when transforming metric name and when reducing/changing existing attributes. Adding new attributes is safe.
- [Orphaned Telemetry](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#orphaned-telemetry): The processor allows you to modify `span_id`, `trace_id`, and `parent_span_id` for traces and `span_id`, and `trace_id` logs. Modifying these fields could lead to orphaned spans or logs.

## Feature Gate

### `transform.flatten.logs`

The `transform.flatten.logs` [feature gate](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#collector-feature-gates) enables the `flatten_data` configuration option (default `false`). With `flatten_data: true`, the processor provides each log record with a distinct copy of its resource and scope. Then, after applying all transformations, the log records are regrouped by resource and scope.

This option is useful when applying transformations which alter the resource or scope. e.g. `set(resource.attributes["to"], attributes["from"])`, which may otherwise result in unexpected behavior. Using this option typically incurs a performance penalty as the processor must compute many hashes and create copies of resource and scope information for every log record.

The feature is currently only available for log processing.

#### Example Usage

`config.yaml`:

```yaml
transform:
flatten_data: true
log_statements:
- context: log
statements:
- set(resource.attributes["to"], attributes["from"])
```

Run collector: `./otelcol --config config.yaml --feature-gates=transform.flatten.logs`
18 changes: 18 additions & 0 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package transformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"

import (
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -15,6 +18,15 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"
)

var (
flatLogsFeatureGate = featuregate.GlobalRegistry().MustRegister("transform.flatten.logs", featuregate.StageAlpha,
featuregate.WithRegisterDescription("Flatten log data prior to transformation so every record has a unique copy of the resource and scope. Regroups logs based on resource and scope after transformations."),
featuregate.WithRegisterFromVersion("v0.103.0"),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953"),
)
errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled")
)

// Config defines the configuration for the processor.
type Config struct {
// ErrorMode determines how the processor reacts to errors that occur while processing a statement.
Expand All @@ -27,6 +39,8 @@ type Config struct {
TraceStatements []common.ContextStatements `mapstructure:"trace_statements"`
MetricStatements []common.ContextStatements `mapstructure:"metric_statements"`
LogStatements []common.ContextStatements `mapstructure:"log_statements"`

FlattenData bool `mapstructure:"flatten_data"`
}

var _ component.Config = (*Config)(nil)
Expand Down Expand Up @@ -73,5 +87,9 @@ func (c *Config) Validate() error {
}
}

if c.FlattenData && !flatLogsFeatureGate.IsEnabled() {
errors = multierr.Append(errors, errFlatLogsGateDisabled)
}

return errors
}
2 changes: 1 addition & 1 deletion processor/transformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func createLogsProcessor(
) (processor.Logs, error) {
oCfg := cfg.(*Config)

proc, err := logs.NewProcessor(oCfg.LogStatements, oCfg.ErrorMode, set.TelemetrySettings)
proc, err := logs.NewProcessor(oCfg.LogStatements, oCfg.ErrorMode, oCfg.FlattenData, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions processor/transformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.21.0
require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.102.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.102.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -86,3 +88,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden =>
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/common => ../../internal/common

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter => ../../internal/filter

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../internal/pdatautil
9 changes: 8 additions & 1 deletion processor/transformprocessor/internal/logs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type Processor struct {
contexts []consumer.Logs
logger *zap.Logger
flatMode bool
}

func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.ErrorMode, settings component.TelemetrySettings) (*Processor, error) {
func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.ErrorMode, flatMode bool, settings component.TelemetrySettings) (*Processor, error) {
pc, err := common.NewLogParserCollection(settings, common.WithLogParser(LogFunctions()), common.WithLogErrorMode(errorMode))
if err != nil {
return nil, err
Expand All @@ -44,10 +46,15 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E
return &Processor{
contexts: contexts,
logger: settings.Logger,
flatMode: flatMode,
}, nil
}

func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
if p.flatMode {
pdatautil.FlattenLogs(ld.ResourceLogs())
defer pdatautil.GroupByResourceLogs(ld.ResourceLogs())
}
for _, c := range p.contexts {
err := c.ConsumeLogs(ctx, ld)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions processor/transformprocessor/internal/logs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Test_ProcessLogs_ResourceContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down Expand Up @@ -84,7 +84,7 @@ func Test_ProcessLogs_ScopeContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down Expand Up @@ -338,7 +338,7 @@ func Test_ProcessLogs_LogContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.statement, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down Expand Up @@ -455,7 +455,7 @@ func Test_ProcessLogs_MixContext(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor(tt.contextStatments, ottl.IgnoreError, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor(tt.contextStatments, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down Expand Up @@ -488,7 +488,7 @@ func Test_ProcessTraces_Error(t *testing.T) {
for _, tt := range tests {
t.Run(string(tt.context), func(t *testing.T) {
td := constructLogs()
processor, err := NewProcessor([]common.ContextStatements{{Context: tt.context, Statements: []string{`set(attributes["test"], ParseJSON(1))`}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings())
processor, err := NewProcessor([]common.ContextStatements{{Context: tt.context, Statements: []string{`set(attributes["test"], ParseJSON(1))`}}}, ottl.PropagateError, false, componenttest.NewNopTelemetrySettings())
assert.NoError(t, err)

_, err = processor.ProcessLogs(context.Background(), td)
Expand Down
Loading

0 comments on commit b03a61a

Please sign in to comment.