Skip to content

Commit

Permalink
[pkg/stanza] Fix operator sorting internal to stanza now that its int…
Browse files Browse the repository at this point in the history
…egrated (open-telemetry#10288)

Fix operator sorting internal to stanza now that its integrated
  • Loading branch information
dehaansa committed May 24, 2022
1 parent 62d1641 commit bee035e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- `mongodbreceiver`: Fix issue where receiver startup could hang (#10111)
- `transformprocessor`: Fix issue where metric.aggregation_temporality and metric.is_monotic were not actually gettable or settable (#10197)
- `podmanreceiver`: Container Stats Error structure (#9397)
- `pkg/stanza`: pipeline.Operators() will return a consistently ordered list of operators whenever possible (#9761)

## v0.51.0

Expand Down
9 changes: 9 additions & 0 deletions pkg/stanza/pipeline/directed.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ func (p *DirectedPipeline) Render() ([]byte, error) {
// Operators returns a slice of operators that make up the pipeline graph
func (p *DirectedPipeline) Operators() []operator.Operator {
operators := make([]operator.Operator, 0)
if nodes, err := topo.Sort(p.Graph); err == nil {
for _, node := range nodes {
operators = append(operators, node.(OperatorNode).Operator())
}
return operators
}

// If for some unexpected reason an Unorderable error is returned,
// when using topo.Sort, return the list without ordering
nodes := p.Graph.Nodes()
for nodes.Next() {
operators = append(operators, nodes.Node().(OperatorNode).Operator())
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/pipeline/directed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,5 @@ func TestPipelineOperators(t *testing.T) {
require.NoError(t, err)

operators := pipeline.Operators()
require.ElementsMatch(t, []operator.Operator{mockOperator1, mockOperator2, mockOperator3}, operators)
require.Equal(t, []operator.Operator{mockOperator1, mockOperator2, mockOperator3}, operators)
}
2 changes: 1 addition & 1 deletion processor/logstransformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require (
go.opentelemetry.io/collector v0.51.1-0.20220519211145-c56d20e9e0af
go.opentelemetry.io/collector/pdata v0.51.1-0.20220519211145-c56d20e9e0af
go.uber.org/zap v1.21.0
gonum.org/v1/gonum v0.11.0
)

require (
Expand All @@ -36,6 +35,7 @@ require (
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.46.2 // indirect
google.golang.org/protobuf v1.28.0 // indirect
Expand Down
11 changes: 3 additions & 8 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"gonum.org/v1/gonum/graph/topo"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanza"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -98,15 +97,11 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos
}

ltp.pipe = pipe

orderedNodes, err := topo.Sort(pipe.Graph)
if err != nil {
return err
}
if len(orderedNodes) == 0 {
pipelineOperators := pipe.Operators()
if len(pipelineOperators) == 0 {
return errors.New("processor requires at least one operator to be configured")
}
ltp.firstOperator = orderedNodes[0].(pipeline.OperatorNode).Operator()
ltp.firstOperator = pipelineOperators[0]

wkrCount := int(math.Max(1, float64(runtime.NumCPU())))
if baseCfg.Converter.WorkerCount > 0 {
Expand Down

0 comments on commit bee035e

Please sign in to comment.