Skip to content

Commit

Permalink
[spanmetrics] Move spanmetrics cfg validation to ConfigValidator#Vali…
Browse files Browse the repository at this point in the history
…date. (open-telemetry#18530)

Move spanmetrics components configuration validations to ConfigValidator#Validate.
  • Loading branch information
kovrus committed Feb 24, 2023
1 parent c9b3aba commit 99c3d98
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 258 deletions.
50 changes: 50 additions & 0 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package spanmetricsconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector"

import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pmetric"
)
Expand Down Expand Up @@ -70,6 +72,25 @@ type Config struct {
Namespace string `mapstructure:"namespace"`
}

var _ component.ConfigValidator = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (c Config) Validate() error {
err := validateDimensions(c.Dimensions, dropSanitizationGate.IsEnabled())
if err != nil {
return err
}

if c.DimensionsCacheSize <= 0 {
return fmt.Errorf(
"invalid cache size: %v, the maximum number of the items in the cache should be positive",
c.DimensionsCacheSize,
)
}

return nil
}

// GetAggregationTemporality converts the string value given in the config into a AggregationTemporality.
// Returns cumulative, unless delta is correctly specified.
func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
Expand All @@ -78,3 +99,32 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
}
return pmetric.AggregationTemporalityCumulative
}

// validateDimensions checks duplicates for reserved dimensions and additional dimensions. Considering
// the usage of Prometheus related exporters, we also validate the dimensions after sanitization.
func validateDimensions(dimensions []Dimension, skipSanitizeLabel bool) error {
labelNames := make(map[string]struct{})
for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey} {
labelNames[key] = struct{}{}
labelNames[sanitize(key, skipSanitizeLabel)] = struct{}{}
}
labelNames[spanNameKey] = struct{}{}

for _, key := range dimensions {
if _, ok := labelNames[key.Name]; ok {
return fmt.Errorf("duplicate dimension name %s", key.Name)
}
labelNames[key.Name] = struct{}{}

sanitizedName := sanitize(key.Name, skipSanitizeLabel)
if sanitizedName == key.Name {
continue
}
if _, ok := labelNames[sanitizedName]; ok {
return fmt.Errorf("duplicate dimension name %s after sanitization", sanitizedName)
}
labelNames[sanitizedName] = struct{}{}
}

return nil
}
74 changes: 74 additions & 0 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,77 @@ func TestGetAggregationTemporality(t *testing.T) {
cfg = &Config{}
assert.Equal(t, pmetric.AggregationTemporalityCumulative, cfg.GetAggregationTemporality())
}

func TestValidateDimensions(t *testing.T) {
for _, tc := range []struct {
name string
dimensions []Dimension
expectedErr string
skipSanitizeLabel bool
}{
{
name: "no additional dimensions",
dimensions: []Dimension{},
},
{
name: "no duplicate dimensions",
dimensions: []Dimension{
{Name: "http.service_name"},
{Name: "http.status_code"},
},
},
{
name: "duplicate dimension with reserved labels",
dimensions: []Dimension{
{Name: "service.name"},
},
expectedErr: "duplicate dimension name service.name",
},
{
name: "duplicate dimension with reserved labels after sanitization",
dimensions: []Dimension{
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions",
dimensions: []Dimension{
{Name: "service_name"},
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions after sanitization",
dimensions: []Dimension{
{Name: "http.status_code"},
{Name: "http!status_code"},
},
expectedErr: "duplicate dimension name http_status_code after sanitization",
},
{
name: "we skip the case if the dimension name is the same after sanitization",
dimensions: []Dimension{
{Name: "http_status_code"},
},
},
{
name: "duplicate dimension",
dimensions: []Dimension{
{Name: "status_code"},
},
expectedErr: "duplicate dimension name status_code",
},
} {
t.Run(tc.name, func(t *testing.T) {
tc.skipSanitizeLabel = false
err := validateDimensions(tc.dimensions, tc.skipSanitizeLabel)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}
40 changes: 0 additions & 40 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package spanmetricsconnector // import "github.com/open-telemetry/opentelemetry-
import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -142,16 +141,6 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
bounds = mapDurationsToMillis(pConfig.LatencyHistogramBuckets)
}

if err := validateDimensions(pConfig.Dimensions, pConfig.skipSanitizeLabel); err != nil {
return nil, err
}

if pConfig.DimensionsCacheSize <= 0 {
return nil, fmt.Errorf(
"invalid cache size: %v, the maximum number of the items in the cache should be positive",
pConfig.DimensionsCacheSize,
)
}
metricKeyToDimensionsCache, err := cache.NewCache[metricKey, pcommon.Map](pConfig.DimensionsCacheSize)
if err != nil {
return nil, err
Expand Down Expand Up @@ -185,35 +174,6 @@ func mapDurationsToMillis(vs []time.Duration) []float64 {
return vsm
}

// validateDimensions checks duplicates for reserved dimensions and additional dimensions. Considering
// the usage of Prometheus related exporters, we also validate the dimensions after sanitization.
func validateDimensions(dimensions []Dimension, skipSanitizeLabel bool) error {
labelNames := make(map[string]struct{})
for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey} {
labelNames[key] = struct{}{}
labelNames[sanitize(key, skipSanitizeLabel)] = struct{}{}
}
labelNames[spanNameKey] = struct{}{}

for _, key := range dimensions {
if _, ok := labelNames[key.Name]; ok {
return fmt.Errorf("duplicate dimension name %s", key.Name)
}
labelNames[key.Name] = struct{}{}

sanitizedName := sanitize(key.Name, skipSanitizeLabel)
if sanitizedName == key.Name {
continue
}
if _, ok := labelNames[sanitizedName]; ok {
return fmt.Errorf("duplicate dimension name %s after sanitization", sanitizedName)
}
labelNames[sanitizedName] = struct{}{}
}

return nil
}

// Start implements the component.Component interface.
func (p *connectorImp) Start(ctx context.Context, _ component.Host) error {
p.logger.Info("Starting spanmetricsconnector")
Expand Down
97 changes: 0 additions & 97 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,88 +391,6 @@ func TestBuildKeyWithDimensions(t *testing.T) {
}
}

func TestConnectorDuplicateDimensions(t *testing.T) {
// Prepare
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
// Duplicate dimension with reserved label after sanitization.
cfg.Dimensions = []Dimension{
{Name: "status_code"},
}

// Test
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
assert.Error(t, err)
assert.Nil(t, c)
}

func TestValidateDimensions(t *testing.T) {
for _, tc := range []struct {
name string
dimensions []Dimension
expectedErr string
skipSanitizeLabel bool
}{
{
name: "no additional dimensions",
dimensions: []Dimension{},
},
{
name: "no duplicate dimensions",
dimensions: []Dimension{
{Name: "http.service_name"},
{Name: "http.status_code"},
},
},
{
name: "duplicate dimension with reserved labels",
dimensions: []Dimension{
{Name: "service.name"},
},
expectedErr: "duplicate dimension name service.name",
},
{
name: "duplicate dimension with reserved labels after sanitization",
dimensions: []Dimension{
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions",
dimensions: []Dimension{
{Name: "service_name"},
{Name: "service_name"},
},
expectedErr: "duplicate dimension name service_name",
},
{
name: "duplicate additional dimensions after sanitization",
dimensions: []Dimension{
{Name: "http.status_code"},
{Name: "http!status_code"},
},
expectedErr: "duplicate dimension name http_status_code after sanitization",
},
{
name: "we skip the case if the dimension name is the same after sanitization",
dimensions: []Dimension{
{Name: "http_status_code"},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
tc.skipSanitizeLabel = false
err := validateDimensions(tc.dimensions, tc.skipSanitizeLabel)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}

func TestSanitize(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.Equal(t, "", sanitize("", cfg.skipSanitizeLabel), "")
Expand Down Expand Up @@ -862,21 +780,6 @@ func newConnectorImp(mcon consumer.Metrics, defaultNullValue *pcommon.Value, tem
}
}

func TestDuplicateDimensions(t *testing.T) {
// Prepare
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
// Duplicate dimension with reserved label after sanitization.
cfg.Dimensions = []Dimension{
{Name: "status_code"},
}

// Test
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
assert.Error(t, err)
assert.Nil(t, c)
}

func TestUpdateExemplars(t *testing.T) {
// ----- conditions -------------------------------------------------------
factory := NewFactory()
Expand Down
50 changes: 50 additions & 0 deletions processor/spanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package spanmetricsprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor"

import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pmetric"
)
Expand Down Expand Up @@ -73,6 +75,8 @@ type Config struct {
Namespace string `mapstructure:"namespace"`
}

var _ component.ConfigValidator = (*Config)(nil)

// GetAggregationTemporality converts the string value given in the config into a AggregationTemporality.
// Returns cumulative, unless delta is correctly specified.
func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
Expand All @@ -81,3 +85,49 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
}
return pmetric.AggregationTemporalityCumulative
}

// Validate checks if the processor configuration is valid
func (c Config) Validate() error {
err := validateDimensions(c.Dimensions, dropSanitizationGate.IsEnabled())
if err != nil {
return err
}

if c.DimensionsCacheSize <= 0 {
return fmt.Errorf(
"invalid cache size: %v, the maximum number of the items in the cache should be positive",
c.DimensionsCacheSize,
)
}

return nil
}

// validateDimensions checks duplicates for reserved dimensions and additional dimensions. Considering
// the usage of Prometheus related exporters, we also validate the dimensions after sanitization.
func validateDimensions(dimensions []Dimension, skipSanitizeLabel bool) error {
labelNames := make(map[string]struct{})
for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey} {
labelNames[key] = struct{}{}
labelNames[sanitize(key, skipSanitizeLabel)] = struct{}{}
}
labelNames[operationKey] = struct{}{}

for _, key := range dimensions {
if _, ok := labelNames[key.Name]; ok {
return fmt.Errorf("duplicate dimension name %s", key.Name)
}
labelNames[key.Name] = struct{}{}

sanitizedName := sanitize(key.Name, skipSanitizeLabel)
if sanitizedName == key.Name {
continue
}
if _, ok := labelNames[sanitizedName]; ok {
return fmt.Errorf("duplicate dimension name %s after sanitization", sanitizedName)
}
labelNames[sanitizedName] = struct{}{}
}

return nil
}
Loading

0 comments on commit 99c3d98

Please sign in to comment.