Skip to content

Commit

Permalink
[schema processor] Part 3 - Modifiers and Revisions open-telemetry#12147
Browse files Browse the repository at this point in the history
 (open-telemetry#17020)

* [schema processor] Modifier

A modifier allows mutating a signal from a previous version to the
current one and vice versa.
[schema processor] Adding revisions

A revision handles applying the modifiers to incoming signals and
converting a signal to the next version.

* Adding changes

* Adding migrate package.

Following patterns similar to databases where you'd apply changes
and rollback. the Migrate packages follows these designs to help make it
clear what it is trying to do.

* Removing modiferies to avoid confusion

* Fixing up naming issues

* Adding checks in for building a revision

* Fixing up replaces

* Removing transform files

* Fixing naming

* Fixing up conflict

* Fixing typo

* Updating naming

* Fixing up ast usage

* FIxing up generated mod files

* Fixing up license headers
  • Loading branch information
MovieStoreGuy committed May 23, 2023
1 parent 44b075a commit 085abb9
Show file tree
Hide file tree
Showing 15 changed files with 1,363 additions and 128 deletions.
3 changes: 2 additions & 1 deletion processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ require (
go.opentelemetry.io/collector/confmap v0.78.1
go.opentelemetry.io/collector/consumer v0.78.1
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012
go.opentelemetry.io/otel/schema v0.0.4
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
)

Expand Down Expand Up @@ -39,7 +41,6 @@ require (
go.opentelemetry.io/otel/metric v0.38.1 // indirect
go.opentelemetry.io/otel/trace v1.15.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions processor/schemaprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions processor/schemaprocessor/internal/alias/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type Resource interface {
Resource() pcommon.Resource
}

// Signal represents a subset of incoming pdata
// NamedSignal represents a subset of incoming pdata
// that can be updated using the schema processor
type Signal interface {
type NamedSignal interface {
Name() string

SetName(name string)
Expand All @@ -35,7 +35,15 @@ var (
_ Resource = (*pmetric.ResourceMetrics)(nil)
_ Resource = (*ptrace.ResourceSpans)(nil)

_ Signal = (*pmetric.Metric)(nil)
_ Signal = (*ptrace.Span)(nil)
_ Signal = (*ptrace.SpanEvent)(nil)
_ NamedSignal = (*pmetric.Metric)(nil)
_ NamedSignal = (*ptrace.Span)(nil)
_ NamedSignal = (*ptrace.SpanEvent)(nil)
)

// AttributeKey is a type alias of string to help
// make clear what the strings being stored represent
type AttributeKey = string

// SignalName is a type alias of a string to help
// make clear what a type field is being used for.
type SignalName = string
16 changes: 16 additions & 0 deletions processor/schemaprocessor/internal/alias/alias_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package alias

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTypeAliases(t *testing.T) {
t.Parallel()

assert.IsType(t, (*string)(nil), (*AttributeKey)(nil))
assert.IsType(t, (*string)(nil), (*SignalName)(nil))
}
113 changes: 113 additions & 0 deletions processor/schemaprocessor/internal/migrate/attributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/otel/schema/v1.0/ast"
"go.uber.org/multierr"
)

// AttributeChangeSet represents an unscoped entry that can be applied.
//
// The listed changes are duplicated twice
// to allow for simplified means of transition to or from a revision.
type AttributeChangeSet struct {
updates ast.AttributeMap
rollback ast.AttributeMap
}

// AttributeChangeSetSlice allows for `AttributeChangeSet`
// to be chained together as they are defined within the schema
// and be applied sequentially to ensure deterministic behavior.
type AttributeChangeSetSlice []*AttributeChangeSet

// NewAttributeChangeSet allows for typed strings to be used as part
// of the invocation that will be converted into the default string type.
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
attr := &AttributeChangeSet{
updates: make(map[string]string, len(mappings)),
rollback: make(map[string]string, len(mappings)),
}
for k, v := range mappings {
attr.updates[k] = v
attr.rollback[v] = k
}
return attr
}

func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
return a.do(StateSelectorApply, attrs)
}

func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
return a.do(StateSelectorRollback, attrs)
}

func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
var (
updated = make(map[string]struct{})
results = pcommon.NewMap()
)
attrs.Range(func(k string, v pcommon.Value) bool {
var (
key string
matched bool
)
switch ss {
case StateSelectorApply:
key, matched = a.updates[k]
case StateSelectorRollback:
key, matched = a.rollback[k]
}
if matched {
k, updated[key] = key, struct{}{}
} else {
// TODO: Since the spec hasn't decided the behavior on what
// should happen on a name conflict, this will assume
// the rewrite has priority and will set it to the original
// entry's value, not the existing value.
if _, overridden := updated[k]; overridden {
errs = multierr.Append(errs, fmt.Errorf("value %q already exists", k))
return true
}
}
v.CopyTo(results.PutEmpty(k))
return true
})
results.CopyTo(attrs)
return errs
}

// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
// and allows them to be executed in the provided order.
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
values := new(AttributeChangeSetSlice)
for _, c := range changes {
(*values) = append((*values), c)
}
return values
}

func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
return slice.do(StateSelectorApply, attrs)
}

func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
return slice.do(StateSelectorRollback, attrs)
}

func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
for i := 0; i < len(*slice); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
}
}
return errs
}
Loading

0 comments on commit 085abb9

Please sign in to comment.