Skip to content

Commit

Permalink
[processor/deltatocumulative]: drop samples of streams exceeding limit (
Browse files Browse the repository at this point in the history
open-telemetry#33286)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
when at limit, the `telemetry` component recorded
`dps_dropped{reason="at-limit"}` and set the error to nil.

this in turn let `Aggregate` call `CopyTo` on a zero-valued (effectively
nil) `pmetric.NumberDataPoint`, leading to a panic due to nil-pointer
deref.

For this specific case, the metric is not tracked, given we are at limit
so we cannot CopyTo anything, because it does not exist.

Added signaling behavior to specifically drop those samples altogether.



**Link to tracking Issue:** <Issue number if applicable>
Fixes
open-telemetry#33285

**Testing:** added, `TestDrop`, `TestAggregate`

**Documentation:** not needed
  • Loading branch information
sh0rez committed Jun 10, 2024
1 parent 4b517cf commit ad61590
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 31 deletions.
30 changes: 30 additions & 0 deletions .chloggen/deltatocumulative-drop-at-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulative

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: properly drop samples when at limit

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33285]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
fixes a segfault in the limiting behavior, where streams exceeding the limit still had their samples processed.
due to not being tracked, this led to a nil-pointer deref


# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
20 changes: 20 additions & 0 deletions processor/deltatocumulativeprocessor/internal/metrics/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics"

import (
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
)

Expand All @@ -28,6 +30,12 @@ func (s Sum) Ident() Ident {
return (*Metric)(&s).Ident()
}

func (s Sum) Filter(expr func(data.Number) bool) {
s.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool {
return !expr(data.Number{NumberDataPoint: dp})
})
}

type Histogram Metric

func (s Histogram) At(i int) data.Histogram {
Expand All @@ -43,6 +51,12 @@ func (s Histogram) Ident() Ident {
return (*Metric)(&s).Ident()
}

func (s Histogram) Filter(expr func(data.Histogram) bool) {
s.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
return !expr(data.Histogram{HistogramDataPoint: dp})
})
}

type ExpHistogram Metric

func (s ExpHistogram) At(i int) data.ExpHistogram {
Expand All @@ -57,3 +71,9 @@ func (s ExpHistogram) Len() int {
func (s ExpHistogram) Ident() Ident {
return (*Metric)(&s).Ident()
}

func (s ExpHistogram) Filter(expr func(data.ExpHistogram) bool) {
s.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
return !expr(data.ExpHistogram{DataPoint: dp})
})
}
28 changes: 21 additions & 7 deletions processor/deltatocumulativeprocessor/internal/streams/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,34 @@ func Samples[D data.Point[D]](m metrics.Data[D]) Seq[D] {
}
}

// Aggregate each point and replace it by the result
func Aggregate[D data.Point[D]](m metrics.Data[D], aggr Aggregator[D]) error {
type filterable[D data.Point[D]] interface {
metrics.Data[D]
Filter(func(D) bool)
}

// Apply does dps[i] = fn(dps[i]) for each item in dps.
// If fn returns [streams.Drop], the datapoint is removed from dps instead.
// If fn returns another error, the datapoint is also removed and the error returned eventually
func Apply[P data.Point[P], List filterable[P]](dps List, fn func(Ident, P) (P, error)) error {
var errs error

// for id, dp := range Samples(m)
Samples(m)(func(id Ident, dp D) bool {
next, err := aggr.Aggregate(id, dp)
mid := dps.Ident()
dps.Filter(func(dp P) bool {
id := identity.OfStream(mid, dp)
next, err := fn(id, dp)
if err != nil {
errs = errors.Join(errs, Error(id, err))
return true
if !errors.Is(err, Drop) {
errs = errors.Join(errs, err)
}
return false
}

next.CopyTo(dp)
return true
})

return errs
}

// Drop signals the current item (stream or datapoint) is to be dropped
var Drop = errors.New("stream dropped") //nolint:revive // Drop is a good name for a signal, see fs.SkipAll
54 changes: 34 additions & 20 deletions processor/deltatocumulativeprocessor/internal/streams/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package streams_test

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -23,7 +24,7 @@ func BenchmarkSamples(b *testing.B) {
dps := generate(b.N)
b.ResetTimer()

streams.Samples[data.Number](dps)(func(id streams.Ident, dp data.Number) bool {
streams.Samples(dps)(func(id streams.Ident, dp data.Number) bool {
rdp = dp
rid = id
return true
Expand Down Expand Up @@ -55,22 +56,6 @@ func BenchmarkSamples(b *testing.B) {
})
}

func TestSample(t *testing.T) {
const total = 1000
dps := generate(total)

// check that all samples are visited
seen := 0
streams.Samples[data.Number](dps)(func(id streams.Ident, dp data.Number) bool {
require.Equal(t, dps.id, id)
require.Equal(t, dps.dps[seen], dp)
seen++
return true
})

require.Equal(t, total, seen)
}

func TestAggregate(t *testing.T) {
const total = 1000
dps := generate(total)
Expand All @@ -82,7 +67,7 @@ func TestAggregate(t *testing.T) {
return dp, nil
})

err := streams.Aggregate(dps, inv)
err := streams.Apply(dps, inv.Aggregate)
require.NoError(t, err)

// check that all samples are inverted
Expand All @@ -91,15 +76,34 @@ func TestAggregate(t *testing.T) {
}
}

func generate(n int) Data {
func TestDrop(t *testing.T) {
const total = 1000
dps := generate(total)

var want []data.Number
maybe := aggr(func(_ streams.Ident, dp data.Number) (data.Number, error) {
if rand.Intn(2) == 1 {
want = append(want, dp)
return dp, nil
}
return dp, streams.Drop
})

err := streams.Apply(dps, maybe.Aggregate)
require.NoError(t, err)

require.Equal(t, want, dps.dps)
}

func generate(n int) *Data {
id, ndp := random.Sum().Stream()
dps := Data{id: id, dps: make([]data.Number, n)}
for i := range dps.dps {
dp := ndp.Clone()
dp.SetIntValue(int64(i))
dps.dps[i] = dp
}
return dps
return &dps
}

type Data struct {
Expand All @@ -119,6 +123,16 @@ func (l Data) Ident() metrics.Ident {
return l.id.Metric()
}

func (l *Data) Filter(expr func(data.Number) bool) {
var next []data.Number
for _, dp := range l.dps {
if expr(dp) {
next = append(next, dp)
}
}
l.dps = next
}

type aggr func(streams.Ident, data.Number) (data.Number, error)

func (a aggr) Aggregate(id streams.Ident, dp data.Number) (data.Number, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestFaults(t *testing.T) {
Pre func(Map, identity.Stream, data.Number) error
Bad func(Map, identity.Stream, data.Number) error
Err error
Want error
}

sum := random.Sum()
Expand Down Expand Up @@ -87,7 +88,8 @@ func TestFaults(t *testing.T) {
dp.SetTimestamp(ts(20))
return dps.Store(id, dp)
},
Err: streams.ErrLimit(1),
Err: streams.ErrLimit(1),
Want: streams.Drop, // we can't ignore being at limit, we need to drop the entire stream for this request
},
{
Name: "evict",
Expand Down Expand Up @@ -130,7 +132,7 @@ func TestFaults(t *testing.T) {
require.Equal(t, c.Err, err)

err = c.Bad(onf, id, dp.Clone())
require.NoError(t, err)
require.Equal(t, c.Want, err)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (f Faults[T]) Store(id streams.Ident, v T) error {
inc(f.dps.dropped, reason("out-of-order"))
case errors.As(err, &limit):
inc(f.dps.dropped, reason("stream-limit"))
// no space to store stream, drop it instead of failing silently
return streams.Drop
case errors.As(err, &evict):
inc(f.streams.evicted)
case errors.As(err, &gap):
Expand Down
4 changes: 2 additions & 2 deletions processor/deltatocumulativeprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
case pmetric.MetricTypeSum:
sum := m.Sum()
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta {
err := streams.Aggregate(metrics.Sum(m), p.sums.aggr)
err := streams.Apply(metrics.Sum(m), p.sums.aggr.Aggregate)
errs = errors.Join(errs, err)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
Expand All @@ -147,7 +147,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
case pmetric.MetricTypeExponentialHistogram:
expo := m.ExponentialHistogram()
if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta {
err := streams.Aggregate(metrics.ExpHistogram(m), p.expo.aggr)
err := streams.Apply(metrics.ExpHistogram(m), p.expo.aggr.Aggregate)
errs = errors.Join(errs, err)
expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
Expand Down

0 comments on commit ad61590

Please sign in to comment.