Skip to content

Commit

Permalink
[processor/deltatocumulative]: bugfix - process all samples (open-tel…
Browse files Browse the repository at this point in the history
…emetry#31349)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Due to an oversight, the processor stopped after the first sample. It
should process all available samples per stream.

**Testing:**
Tests have been added to verify:
- `Samples()` visits all samples
- `Aggregate()` edits all samples

**Issues**
Fixes
open-telemetry#31350
  • Loading branch information
sh0rez committed Feb 26, 2024
1 parent 166a7b4 commit 3c97c8f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-all-samples.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "bug_fix"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "deltatocumulativeprocessor"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: due to an oversight, only the first sample of each stream was processed. now all samples are.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31350]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Aggregate[D data.Point[D]](m metrics.Data[D], aggr Aggregator[D]) error {
return true
}
next.CopyTo(dp)
return false
return true
})

return errs
Expand Down
44 changes: 44 additions & 0 deletions processor/deltatocumulativeprocessor/internal/streams/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package streams_test
import (
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
Expand Down Expand Up @@ -52,6 +54,42 @@ func BenchmarkSamples(b *testing.B) {
})
}

func TestSample(t *testing.T) {
const total = 1000
dps := generate(total)

// check that all samples are visited
seen := 0
streams.Samples[data.Number](dps)(func(id streams.Ident, dp data.Number) bool {
require.Equal(t, dps.id, id)
require.Equal(t, dps.dps[seen], dp)
seen++
return true
})

require.Equal(t, total, seen)
}

func TestAggregate(t *testing.T) {
const total = 1000
dps := generate(total)

// inv aggregator inverts each sample
inv := aggr(func(id streams.Ident, n data.Number) (data.Number, error) {
dp := n.Clone()
dp.SetIntValue(-dp.IntValue())
return dp, nil
})

err := streams.Aggregate(dps, inv)
require.NoError(t, err)

// check that all samples are inverted
for i := 0; i < total; i++ {
require.Equal(t, int64(-i), dps.dps[i].IntValue())
}
}

func generate(n int) Data {
id, ndp := random.Sum().Stream()
dps := Data{id: id, dps: make([]data.Number, n)}
Expand Down Expand Up @@ -79,3 +117,9 @@ func (l Data) Len() int {
func (l Data) Ident() metrics.Ident {
return l.id.Metric()
}

type aggr func(streams.Ident, data.Number) (data.Number, error)

func (a aggr) Aggregate(id streams.Ident, dp data.Number) (data.Number, error) {
return a(id, dp)
}

0 comments on commit 3c97c8f

Please sign in to comment.