Skip to content

Commit

Permalink
[ resource generation processor] add new processor skeleton and confi…
Browse files Browse the repository at this point in the history
…guration examples (open-telemetry#3266)

* Add resource generation processor skeleton and configuration examples

Signed-off-by: Rayhan Hossain <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/factory.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/factory.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Anthony Mirabella <[email protected]>

* Update processor/metricsgenerationprocessor/config.go

Co-authored-by: Anthony Mirabella <[email protected]>

* Rename generation_rules to rules

Signed-off-by: Rayhan Hossain <[email protected]>

* Use direct lookp map for OperationTypes

Signed-off-by: Rayhan Hossain <[email protected]>

* Address minor PR feedbacks

Signed-off-by: Rayhan Hossain <[email protected]>

* Update error message

Signed-off-by: Rayhan Hossain <[email protected]>

* nit: refactoring with inline method

Signed-off-by: Rayhan Hossain <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Min Xia <[email protected]>

* Use Config.Validator() for the configuration validation

Signed-off-by: Rayhan Hossain <[email protected]>

* Rebased and updated go version for the processor

Signed-off-by: Rayhan Hossain <[email protected]>

* Update config field name following PR feedbacks

Signed-off-by: Rayhan Hossain <[email protected]>

* Rename fields and make constant private fields

Signed-off-by: Rayhan Hossain <[email protected]>

* Rename config variable to match with yaml field

Signed-off-by: Rayhan Hossain <[email protected]>

* Add experimental_ prefix to the processor name

Signed-off-by: Rayhan Hossain <[email protected]>

* Update processor/metricsgenerationprocessor/README.md

Co-authored-by: Punya Biswal <[email protected]>

Co-authored-by: Anthony Mirabella <[email protected]>
Co-authored-by: Min Xia <[email protected]>
Co-authored-by: Punya Biswal <[email protected]>
  • Loading branch information
4 people committed May 17, 2021
1 parent b10b981 commit d31b3ec
Show file tree
Hide file tree
Showing 18 changed files with 2,417 additions and 0 deletions.
1 change: 1 addition & 0 deletions processor/metricsgenerationprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
64 changes: 64 additions & 0 deletions processor/metricsgenerationprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Metrics Generation Processor
**Status: under development; Not recommended for production usage.**

Supported pipeline types: metrics

## Description

The metrics generation processor (`experimental_metricsgenerationprocessor`) can be used to create new metrics using existing metrics following a given rule. Currently it supports following two approaches for creating a new metric.

1. It can create a new metric from two existing metrics by applying one of the folliwing arithmetic operations: add, subtract, multiply, divide and percent. One use case is to calculate the `pod.memory.utilization` metric like the following equation-
`pod.memory.utilization` = (`pod.memory.usage.bytes` / `node.memory.limit`)
1. It can create a new metric by scaling the value of an existing metric with a given constant number. One use case is to convert `pod.memory.usage` metric values from Megabytes to Bytes (multiply the existing metric's value by 1,048,576)

## Configuration

Configuration is specified through a list of generation rules. Generation rules find the metrics which
match the given metric names and apply the specified operation to those metrics.

```yaml
processors:
# processor name: experimental_metricsgeneration
experimental_metricsgeneration:

# specify the metric generation rules
rules:
# Name of the new metric. This is a required field.
- name: <new_metric_name>

# type describes how the new metric will be generated. It can be one of `calculate` or `scale`. calculate generates a metric applying the given operation on two operand metrics. scale operates only on operand1 metric to generate the new metric.
type: {calculate, scale}

# This is a required field.
metric1: <first_operand_metric>

# This field is required only if the type is "calculate".
metric2: <second_operand_metric>

# Operation specifies which arithmetic operation to apply. It must be one of the five supported operations.
operation: {add, subtract, multiply, divide, percent}
```

## Example Configurations

### Create a new metric using two existing metrics
```yaml
# create pod.cpu.utilized following (pod.cpu.usage / node.cpu.limit)
rules:
- name: pod.cpu.utilized
type: calculate
metric1: pod.cpu.usage
metric2: node.cpu.limit
operation: divide
```

### Create a new metric scaling the value of an existing metric
```yaml
# create pod.memory.usage.bytes from pod.memory.usage.megabytes
rules:
- name: pod.memory.usage.bytes
type: scale
metric1: pod.memory.usage.megabytes
operation: multiply
scale_by: 1048576
```
178 changes: 178 additions & 0 deletions processor/metricsgenerationprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metricsgenerationprocessor

import (
"fmt"
"sort"

"go.opentelemetry.io/collector/config"
)

const (
// nameFieldName is the mapstructure field name for name field
nameFieldName = "name"

// typeFieldName is the mapstructure field name for Type field
typeFieldName = "type"

// metric1FieldName is the mapstructure field name for Metric1 field
metric1FieldName = "metric1"

// metric2FieldName is the mapstructure field name for Metric2 field
metric2FieldName = "metric2"

// scaleByFieldName is the mapstructure field name for ScaleBy field
scaleByFieldName = "scale_by"

// operationFieldName is the mapstructure field name for Operation field
operationFieldName = "operation"
)

// Config defines the configuration for the processor.
type Config struct {
*config.ProcessorSettings `mapstructure:"-"`

// Set of rules for generating new metrics
Rules []Rule `mapstructure:"rules"`
}

type Rule struct {
// Name of the new metric being generated. This is a required field.
Name string `mapstructure:"name"`

// The rule type following which the new metric will be generated. This is a required field.
Type GenerationType `mapstructure:"type"`

// First operand metric to use in the calculation. This is a required field.
Metric1 string `mapstructure:"metric1"`

// Second operand metric to use in the calculation. A required field if the type is calculate.
Metric2 string `mapstructure:"metric2"`

// The arithmetic operation to apply for the calculation. This is a required field.
Operation OperationType `mapstructure:"operation"`

// A constant number by which the first operand will be scaled. A required field if the type is scale.
ScaleBy float64 `mapstructure:"scale_by"`
}

type GenerationType string

const (

// Generates a new metric applying an arithmatic operation with two operands
calculate GenerationType = "calculate"

// Generates a new metric scaling the value of s given metric with a provided constant
scale GenerationType = "scale"
)

var generationTypes = map[GenerationType]struct{}{calculate: {}, scale: {}}

func (gt GenerationType) isValid() bool {
_, ok := generationTypes[gt]
return ok
}

var generationTypeKeys = func() []string {
ret := make([]string, len(generationTypes))
i := 0
for k := range generationTypes {
ret[i] = string(k)
i++
}
sort.Strings(ret)
return ret
}

type OperationType string

const (

// Adds two operands
add OperationType = "add"

// subtract the second operand from the first operand
subtract OperationType = "subtract"

// multiply two operands
multiply OperationType = "multiply"

// Divides the first operand with the second operand
divide OperationType = "divide"

// Calculates the percentage: (Metric1 / Metric2) * 100
percent OperationType = "percent"
)

var operationTypes = map[OperationType]struct{}{
add: {},
subtract: {},
multiply: {},
divide: {},
percent: {},
}

func (ot OperationType) isValid() bool {
_, ok := operationTypes[ot]
return ok
}

var operationTypeKeys = func() []string {
ret := make([]string, len(operationTypes))
i := 0
for k := range operationTypes {
ret[i] = string(k)
i++
}
sort.Strings(ret)
return ret
}

// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
for _, rule := range config.Rules {
if rule.Name == "" {
return fmt.Errorf("missing required field %q", nameFieldName)
}

if rule.Type == "" {
return fmt.Errorf("missing required field %q", typeFieldName)
}

if !rule.Type.isValid() {
return fmt.Errorf("%q must be in %q", typeFieldName, generationTypeKeys())
}

if rule.Metric1 == "" {
return fmt.Errorf("missing required field %q", metric1FieldName)
}

if rule.Type == calculate && rule.Metric2 == "" {
return fmt.Errorf("missing required field %q for generation type %q", metric2FieldName, calculate)
}

if rule.Type == scale && rule.ScaleBy <= 0 {
return fmt.Errorf("field %q required to be greater than 0 for generation type %q", scaleByFieldName, scale)
}

if rule.Operation != "" && !rule.Operation.isValid() {
return fmt.Errorf("%q must be in %q", operationFieldName, operationTypeKeys())
}
}
return nil
}
142 changes: 142 additions & 0 deletions processor/metricsgenerationprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metricsgenerationprocessor

import (
"fmt"
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtest"
)

func TestLoadingFullConfig(t *testing.T) {
tests := []struct {
configFile string
filterName string
expCfg *Config
}{
{
configFile: "config_full.yaml",
filterName: "experimental_metricsgeneration",
expCfg: &Config{
ProcessorSettings: config.NewProcessorSettings(typeStr),
Rules: []Rule{
{
Name: "new_metric",
Type: "calculate",
Metric1: "metric1",
Metric2: "metric2",
Operation: "percent",
},
{
Name: "new_metric",
Type: "scale",
Metric1: "metric1",
ScaleBy: 1000,
Operation: "multiply",
},
},
},
},
}

for _, test := range tests {
t.Run(test.filterName, func(t *testing.T) {

factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[config.Type(typeStr)] = factory
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", test.configFile), factories)
assert.NoError(t, err)
require.NotNil(t, config)

cfg := config.Processors[test.filterName]
assert.Equal(t, test.expCfg, cfg)
})
}
}

func TestValidateConfig(t *testing.T) {
tests := []struct {
configName string
succeed bool
errorMessage string
}{
{
configName: "config_full.yaml",
succeed: true,
},
{
configName: "config_missing_new_metric.yaml",
succeed: false,
errorMessage: fmt.Sprintf("missing required field %q", nameFieldName),
},
{
configName: "config_missing_type.yaml",
succeed: false,
errorMessage: fmt.Sprintf("missing required field %q", typeFieldName),
},
{
configName: "config_invalid_generation_type.yaml",
succeed: false,
errorMessage: fmt.Sprintf("%q must be in %q", typeFieldName, generationTypeKeys()),
},
{
configName: "config_missing_operand1.yaml",
succeed: false,
errorMessage: fmt.Sprintf("missing required field %q", metric1FieldName),
},
{
configName: "config_missing_operand2.yaml",
succeed: false,
errorMessage: fmt.Sprintf("missing required field %q for generation type %q", metric2FieldName, calculate),
},
{
configName: "config_missing_scale_by.yaml",
succeed: false,
errorMessage: fmt.Sprintf("field %q required to be greater than 0 for generation type %q", scaleByFieldName, scale),
},
{
configName: "config_invalid_operation.yaml",
succeed: false,
errorMessage: fmt.Sprintf("%q must be in %q", operationFieldName, operationTypeKeys()),
},
}

for _, test := range tests {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
t.Run(test.configName, func(t *testing.T) {
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", test.configName), factories)
if test.succeed {
assert.NotNil(t, config)
assert.NoError(t, err)
} else {
assert.EqualError(t, err, fmt.Sprintf("processor %q has invalid configuration: %s", typeStr, test.errorMessage))
}
})

}
}
Loading

0 comments on commit d31b3ec

Please sign in to comment.