Skip to content

Commit

Permalink
[chore][pkg/stanza] Cleanup filter operator files (open-telemetry#32062)
Browse files Browse the repository at this point in the history
Contributes to open-telemetry#32058 

Also adds config unmarshal tests to this operator.
  • Loading branch information
djaglowski committed Apr 3, 2024
1 parent aeeb044 commit a128aed
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
package filter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter"

import (
"context"
"crypto/rand"
"fmt"
"math/big"

"github.com/expr-lang/expr/vm"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)
Expand Down Expand Up @@ -70,44 +67,3 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
dropCutoff: big.NewInt(int64(c.DropRatio * 1000)),
}, nil
}

// Transformer is an operator that filters entries based on matching expressions
type Transformer struct {
helper.TransformerOperator
expression *vm.Program
dropCutoff *big.Int // [0..1000)
}

// Process will drop incoming entries that match the filter expression
func (f *Transformer) Process(ctx context.Context, entry *entry.Entry) error {
env := helper.GetExprEnv(entry)
defer helper.PutExprEnv(env)

matches, err := vm.Run(f.expression, env)
if err != nil {
f.Errorf("Running expressing returned an error", zap.Error(err))
return nil
}

filtered, ok := matches.(bool)
if !ok {
f.Errorf("Expression did not compile as a boolean")
return nil
}

if !filtered {
f.Write(ctx, entry)
return nil
}

i, err := randInt(rand.Reader, upperBound)
if err != nil {
return err
}

if i.Cmp(f.dropCutoff) >= 0 {
f.Write(ctx, entry)
}

return nil
}
57 changes: 57 additions & 0 deletions pkg/stanza/operator/transformer/filter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package filter

import (
"path/filepath"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest"
)

// test unmarshalling of values into config struct
func TestUnmarshal(t *testing.T) {
operatortest.ConfigUnmarshalTests{
DefaultConfig: NewConfig(),
TestsFile: filepath.Join(".", "testdata", "config.yaml"),
Tests: []operatortest.ConfigUnmarshalTest{
{
Name: "default",
Expect: NewConfig(),
},
{
Name: "drop_ratio_0",
Expect: func() *Config {
cfg := NewConfig()
cfg.DropRatio = 0
return cfg
}(),
},
{
Name: "drop_ratio_half",
Expect: func() *Config {
cfg := NewConfig()
cfg.DropRatio = 0.5
return cfg
}(),
},
{
Name: "drop_ratio_1",
Expect: func() *Config {
cfg := NewConfig()
cfg.DropRatio = 1
return cfg
}(),
},
{
Name: "expr",
Expect: func() *Config {
cfg := NewConfig()
cfg.Expression = "body == 'value'"
return cfg
}(),
},
},
}.Run(t)
}
14 changes: 14 additions & 0 deletions pkg/stanza/operator/transformer/filter/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
default:
type: filter
drop_ratio_0:
type: filter
drop_ratio: 0
drop_ratio_half:
type: filter
drop_ratio: 0.5
drop_ratio_1:
type: filter
drop_ratio: 1
expr:
type: filter
expr: body == 'value'
57 changes: 57 additions & 0 deletions pkg/stanza/operator/transformer/filter/transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package filter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/filter"

import (
"context"
"crypto/rand"
"math/big"

"github.com/expr-lang/expr/vm"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)

// Transformer is an operator that filters entries based on matching expressions
type Transformer struct {
helper.TransformerOperator
expression *vm.Program
dropCutoff *big.Int // [0..1000)
}

// Process will drop incoming entries that match the filter expression
func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error {
env := helper.GetExprEnv(entry)
defer helper.PutExprEnv(env)

matches, err := vm.Run(t.expression, env)
if err != nil {
t.Errorf("Running expressing returned an error", zap.Error(err))
return nil
}

filtered, ok := matches.(bool)
if !ok {
t.Errorf("Expression did not compile as a boolean")
return nil
}

if !filtered {
t.Write(ctx, entry)
return nil
}

i, err := randInt(rand.Reader, upperBound)
if err != nil {
return err
}

if i.Cmp(t.dropCutoff) >= 0 {
t.Write(ctx, entry)
}

return nil
}

0 comments on commit a128aed

Please sign in to comment.