Skip to content

Commit

Permalink
[processor/filter] fixed vm usage in filterexpr (#16308)
Browse files Browse the repository at this point in the history
* [processor/filter] parallel test for expr matcher

* added failing test to filterexpr package to highlight issue with
parallel usage of expression filter

* [processor/filter] fixed vm usage in filterexpr

* Instead of reusing the same vm for all execution of
Matcher.MatchMetric in filterexpr (which is not gorutine-safe), use
a sync.Pool for managing vms.

* added changelog entry

* Update .chloggen/filterexpr-parallel-fix.yaml

Co-authored-by: Pablo Baeyens <[email protected]>

Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
adam-kiss-sg and mx-psi committed Dec 2, 2022
1 parent b76d544 commit 97530ad
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 22 deletions.
16 changes: 16 additions & 0 deletions .chloggen/filterexpr-parallel-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filterexpr

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fixed filterexpr Matcher.MatchMetric to be thread-safe

# One or more tracking issues related to the change
issues: [13573]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
50 changes: 29 additions & 21 deletions internal/filter/filterexpr/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ package filterexpr // import "github.com/open-telemetry/opentelemetry-collector-

import (
"fmt"
"sync"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

var vmPool = sync.Pool{
New: func() interface{} {
return &vm.VM{}
},
}

type Matcher struct {
program *vm.Program
v vm.VM
}

type env struct {
Expand All @@ -49,31 +55,33 @@ func NewMatcher(expression string) (*Matcher, error) {
if err != nil {
return nil, err
}
return &Matcher{program: program, v: vm.VM{}}, nil
return &Matcher{program: program}, nil
}

func (m *Matcher) MatchMetric(metric pmetric.Metric) (bool, error) {
metricName := metric.Name()
vm := vmPool.Get().(*vm.VM)
defer vmPool.Put(vm)
switch metric.Type() {
case pmetric.MetricTypeGauge:
return m.matchGauge(metricName, metric.Gauge())
return m.matchGauge(metricName, metric.Gauge(), vm)
case pmetric.MetricTypeSum:
return m.matchSum(metricName, metric.Sum())
return m.matchSum(metricName, metric.Sum(), vm)
case pmetric.MetricTypeHistogram:
return m.matchHistogram(metricName, metric.Histogram())
return m.matchHistogram(metricName, metric.Histogram(), vm)
case pmetric.MetricTypeExponentialHistogram:
return m.matchExponentialHistogram(metricName, metric.ExponentialHistogram())
return m.matchExponentialHistogram(metricName, metric.ExponentialHistogram(), vm)
case pmetric.MetricTypeSummary:
return m.matchSummary(metricName, metric.Summary())
return m.matchSummary(metricName, metric.Summary(), vm)
default:
return false, nil
}
}

func (m *Matcher) matchGauge(metricName string, gauge pmetric.Gauge) (bool, error) {
func (m *Matcher) matchGauge(metricName string, gauge pmetric.Gauge, vm *vm.VM) (bool, error) {
pts := gauge.DataPoints()
for i := 0; i < pts.Len(); i++ {
matched, err := m.matchEnv(metricName, pmetric.MetricTypeGauge, pts.At(i).Attributes())
matched, err := m.matchEnv(metricName, pmetric.MetricTypeGauge, pts.At(i).Attributes(), vm)
if err != nil {
return false, err
}
Expand All @@ -84,10 +92,10 @@ func (m *Matcher) matchGauge(metricName string, gauge pmetric.Gauge) (bool, erro
return false, nil
}

func (m *Matcher) matchSum(metricName string, sum pmetric.Sum) (bool, error) {
func (m *Matcher) matchSum(metricName string, sum pmetric.Sum, vm *vm.VM) (bool, error) {
pts := sum.DataPoints()
for i := 0; i < pts.Len(); i++ {
matched, err := m.matchEnv(metricName, pmetric.MetricTypeSum, pts.At(i).Attributes())
matched, err := m.matchEnv(metricName, pmetric.MetricTypeSum, pts.At(i).Attributes(), vm)
if err != nil {
return false, err
}
Expand All @@ -98,10 +106,10 @@ func (m *Matcher) matchSum(metricName string, sum pmetric.Sum) (bool, error) {
return false, nil
}

func (m *Matcher) matchHistogram(metricName string, histogram pmetric.Histogram) (bool, error) {
func (m *Matcher) matchHistogram(metricName string, histogram pmetric.Histogram, vm *vm.VM) (bool, error) {
pts := histogram.DataPoints()
for i := 0; i < pts.Len(); i++ {
matched, err := m.matchEnv(metricName, pmetric.MetricTypeHistogram, pts.At(i).Attributes())
matched, err := m.matchEnv(metricName, pmetric.MetricTypeHistogram, pts.At(i).Attributes(), vm)
if err != nil {
return false, err
}
Expand All @@ -112,10 +120,10 @@ func (m *Matcher) matchHistogram(metricName string, histogram pmetric.Histogram)
return false, nil
}

func (m *Matcher) matchExponentialHistogram(metricName string, eh pmetric.ExponentialHistogram) (bool, error) {
func (m *Matcher) matchExponentialHistogram(metricName string, eh pmetric.ExponentialHistogram, vm *vm.VM) (bool, error) {
pts := eh.DataPoints()
for i := 0; i < pts.Len(); i++ {
matched, err := m.matchEnv(metricName, pmetric.MetricTypeExponentialHistogram, pts.At(i).Attributes())
matched, err := m.matchEnv(metricName, pmetric.MetricTypeExponentialHistogram, pts.At(i).Attributes(), vm)
if err != nil {
return false, err
}
Expand All @@ -126,10 +134,10 @@ func (m *Matcher) matchExponentialHistogram(metricName string, eh pmetric.Expone
return false, nil
}

func (m *Matcher) matchSummary(metricName string, summary pmetric.Summary) (bool, error) {
func (m *Matcher) matchSummary(metricName string, summary pmetric.Summary, vm *vm.VM) (bool, error) {
pts := summary.DataPoints()
for i := 0; i < pts.Len(); i++ {
matched, err := m.matchEnv(metricName, pmetric.MetricTypeSummary, pts.At(i).Attributes())
matched, err := m.matchEnv(metricName, pmetric.MetricTypeSummary, pts.At(i).Attributes(), vm)
if err != nil {
return false, err
}
Expand All @@ -140,16 +148,16 @@ func (m *Matcher) matchSummary(metricName string, summary pmetric.Summary) (bool
return false, nil
}

func (m *Matcher) matchEnv(metricName string, metricType pmetric.MetricType, attributes pcommon.Map) (bool, error) {
func (m *Matcher) matchEnv(metricName string, metricType pmetric.MetricType, attributes pcommon.Map, vm *vm.VM) (bool, error) {
return m.match(env{
MetricName: metricName,
MetricType: metricType.String(),
attributes: attributes,
})
}, vm)
}

func (m *Matcher) match(env env) (bool, error) {
result, err := m.v.Run(m.program, &env)
func (m *Matcher) match(env env, vm *vm.VM) (bool, error) {
result, err := vm.Run(m.program, &env)
if err != nil {
return false, err
}
Expand Down
32 changes: 31 additions & 1 deletion internal/filter/filterexpr/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package filterexpr

import (
"sync"
"testing"

"github.com/antonmedv/expr/vm"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -30,7 +32,7 @@ func TestCompileExprError(t *testing.T) {
func TestRunExprError(t *testing.T) {
matcher, err := NewMatcher("foo")
require.NoError(t, err)
matched, _ := matcher.match(env{})
matched, _ := matcher.match(env{}, &vm.VM{})
require.False(t, matched)
}

Expand Down Expand Up @@ -224,3 +226,31 @@ func matchHistogram(t *testing.T, metricName string) bool {
assert.NoError(t, err)
return matched
}

func TestParallel(t *testing.T) {
matcher, err := NewMatcher(`MetricName == 'my.metric' && MetricType == 'Sum'`)
require.NoError(t, err)

wg := &sync.WaitGroup{}
start := make(chan struct{})
testMetric := func(t *testing.T, count int) {
defer wg.Done()
<-start
for i := 0; i < count; i++ {
m := pmetric.NewMetric()
m.SetName("my.metric")
m.SetEmptySum().DataPoints().AppendEmpty()
matched, err := matcher.MatchMetric(m)
assert.NoError(t, err)
assert.True(t, matched)
}
}

for i := 0; i < 20; i++ {
wg.Add(1)
go testMetric(t, 20)
}

close(start)
wg.Wait()
}

0 comments on commit 97530ad

Please sign in to comment.