Skip to content

Commit

Permalink
[processor/transform] Add functions for conversion of scalar metric t…
Browse files Browse the repository at this point in the history
…ypes (open-telemetry#10255)

* add conversion functions for scalar metrics

* fix failing factory test

* lint

* revert EqualsValue

* update registry comment

* changelog

* move Bool rule down under string

* Rename metric only functions header

Co-authored-by: Tyler Helmuth <[email protected]>

* add example in readme

* capitialize Sum

* Add disclaimer to README that functions may break semantics

* use new testhelper package for test pointers from literals

* move changelog entry to unreleased

Co-authored-by: Tyler Helmuth <[email protected]>
  • Loading branch information
BinaryFissionGames and TylerHelmuth committed Jun 1, 2022
1 parent 04a92a2 commit bb685fa
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

- `tailsamplingprocessor`: Add support for string invert matching to `and` policy (#9553)
- `mezemoexporter`: Add user agent string to outgoing HTTP requests (#10470)
- `transformprocessor`: Add functions for conversion of scalar metric types (`gauge_to_sum` and `sum_to_gauge`) (#10255)

### 🧰 Bug fixes 🧰

Expand Down
11 changes: 11 additions & 0 deletions processor/transformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ the fields specified by the list of strings. e.g., `keep_keys(attributes, "http.

- `replace_all_matches(target, pattern, replacement)` - `target` is a path expression to a map type field, `pattern` is a string following [filepath.Match syntax](https://pkg.go.dev/path/filepath#Match), and `replacement` is a string. Each string value in `target` that matches `pattern` will get replaced with `replacement`. e.g., `replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")`

Metric only functions:
- `convert_sum_to_gauge()` - Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum".
**NOTE:** This function may cause a metric to break semantics for [Gauge metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#gauge). Use at your own risk.

- `convert_gauge_to_sum(aggregation_temporality, is_monotonic)` - `aggregation_temporality` specifies the resultant metric's aggregation temporality. `aggregation_temporality` may be `"cumulative"` or `"delta"`. `is_monotonic` specifies the resultant metric's monotonicity. `is_monotonic` is a boolean. Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge".
**NOTE:** This function may cause a metric to break semantics for [Sum metrics](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/datamodel.md#sums). Use at your own risk.

Supported where operations:
- `==` - matches telemetry where the values are equal to each other
- `!=` - matches telemetry where the values are not equal to each other
Expand Down Expand Up @@ -70,6 +77,8 @@ processors:
- limit(attributes, 100)
- truncate_all(attributes, 4096)
- truncate_all(resource.attributes, 4096)
- convert_sum_to_gauge() where metric.name == "system.processes.count"
- convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric"
logs:
queries:
- set(severity_text, "FAIL") where body == "request failed"
Expand Down Expand Up @@ -108,6 +117,8 @@ All metrics and their data points
4) Limit all data point attributes such that each data point has no more than 100 attributes.
6) Truncate all data point attributes such that no string value has more than 4096 characters.
7) Truncate all resource attributes such that no string value has more than 4096 characters.
8) Convert all metrics with name `system.processes.count` from a Sum to Gauge.
9) Convert all metrics with name `prometheus_metric` from Gauge to a cumulative, non-monotonic Sum.

All logs

Expand Down
3 changes: 2 additions & 1 deletion processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/servicetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestLoadingConfig(t *testing.T) {
`keep_keys(attributes, "http.method", "http.path")`,
},

functions: traces.DefaultFunctions(),
functions: metrics.DefaultFunctions(),
},
Logs: SignalConfig{
Queries: []string{
Expand Down
3 changes: 2 additions & 1 deletion processor/transformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/logs"
Expand All @@ -50,7 +51,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
Metrics: SignalConfig{
Queries: []string{},

functions: traces.DefaultFunctions(),
functions: metrics.DefaultFunctions(),
},
Logs: SignalConfig{
Queries: []string{},
Expand Down
5 changes: 5 additions & 0 deletions processor/transformprocessor/internal/common/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ func NewFunctionCall(inv Invocation, functions map[string]interface{}, pathParse
return nil, fmt.Errorf("invalid argument at position %v, must be a string", i)
}
args = append(args, reflect.ValueOf(*argDef.String))
case "bool":
if argDef.Bool == nil {
return nil, fmt.Errorf("invalid argument at position %v, must be a bool", i)
}
args = append(args, reflect.ValueOf(bool(*argDef.Bool)))
}
}
val := reflect.ValueOf(f)
Expand Down
10 changes: 10 additions & 0 deletions processor/transformprocessor/internal/common/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ import (
"go.uber.org/multierr"
)

// Type for capturing booleans, see:
// https://github.com/alecthomas/participle#capturing-boolean-value
type Boolean bool

func (b *Boolean) Capture(values []string) error {
*b = values[0] == "true"
return nil
}

// ParsedQuery represents a parsed query. It is the entry point into the query DSL.
// nolint:govet
type ParsedQuery struct {
Expand Down Expand Up @@ -50,6 +59,7 @@ type Value struct {
String *string `| @String`
Float *float64 `| @Float`
Int *int64 `| @Int`
Bool *Boolean `| @("true" | "false")`
Path *Path `| @@ )`
}

Expand Down
34 changes: 34 additions & 0 deletions processor/transformprocessor/internal/common/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,40 @@ func Test_parse(t *testing.T) {
Condition: nil,
},
},
{
query: `convert_gauge_to_sum("cumulative", false)`,
expected: &ParsedQuery{
Invocation: Invocation{
Function: "convert_gauge_to_sum",
Arguments: []Value{
{
String: testhelper.Strp("cumulative"),
},
{
Bool: (*Boolean)(testhelper.Boolp(false)),
},
},
},
Condition: nil,
},
},
{
query: `convert_gauge_to_sum("cumulative", true)`,
expected: &ParsedQuery{
Invocation: Invocation{
Function: "convert_gauge_to_sum",
Arguments: []Value{
{
String: testhelper.Strp("cumulative"),
},
{
Bool: (*Boolean)(testhelper.Boolp(true)),
},
},
},
Condition: nil,
},
},
}

for _, tt := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ func Floatp(f float64) *float64 {
func Intp(i int64) *int64 {
return &i
}

func Boolp(b bool) *bool {
return &b
}
77 changes: 75 additions & 2 deletions processor/transformprocessor/internal/metrics/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,83 @@
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

// registry is a map of names to functions for metrics pipelines
var registry = map[string]interface{}{
"convert_sum_to_gauge": convertSumToGauge,
"convert_gauge_to_sum": convertGaugeToSum,
}

func init() {
// Init metrics registry with default functions common to all signals
for k, v := range common.DefaultFunctions() {
registry[k] = v
}
}

func DefaultFunctions() map[string]interface{} {
// No metric-only functions yet.
return common.DefaultFunctions()
return registry
}

func convertSumToGauge() (common.ExprFunc, error) {
return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeSum {
return nil
}

dps := metric.Sum().DataPoints()

metric.SetDataType(pmetric.MetricDataTypeGauge)
// Setting the data type removed all the data points, so we must copy them back to the metric.
dps.CopyTo(metric.Gauge().DataPoints())

return nil
}, nil
}

func convertGaugeToSum(stringAggTemp string, monotonic bool) (common.ExprFunc, error) {
var aggTemp pmetric.MetricAggregationTemporality
switch stringAggTemp {
case "delta":
aggTemp = pmetric.MetricAggregationTemporalityDelta
case "cumulative":
aggTemp = pmetric.MetricAggregationTemporalityCumulative
default:
return nil, fmt.Errorf("unknown aggregation temporality: %s", stringAggTemp)
}

return func(ctx common.TransformContext) interface{} {
mtc, ok := ctx.(metricTransformContext)
if !ok {
return nil
}

metric := mtc.GetMetric()
if metric.DataType() != pmetric.MetricDataTypeGauge {
return nil
}

dps := metric.Gauge().DataPoints()

metric.SetDataType(pmetric.MetricDataTypeSum)
metric.Sum().SetAggregationTemporality(aggTemp)
metric.Sum().SetIsMonotonic(monotonic)

// Setting the data type removed all the data points, so we must copy them back to the metric.
dps.CopyTo(metric.Sum().DataPoints())

return nil
}, nil
}
Loading

0 comments on commit bb685fa

Please sign in to comment.