Skip to content

Commit

Permalink
Added routing exporter (open-telemetry#907)
Browse files Browse the repository at this point in the history
Closes open-telemetry/opentelemetry-collector#1260
Supersedes open-telemetry/opentelemetry-collector#1611 

**Testing:** unit tests + manual tests (see open-telemetry/opentelemetry-collector#1611)

**Documentation:** README included.

Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling committed Sep 10, 2020
1 parent 4f2f5cc commit 1b925a1
Show file tree
Hide file tree
Showing 16 changed files with 2,620 additions and 34 deletions.
4 changes: 4 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ updates:
directory: "/processor/resourcedetectionprocessor"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/processor/routingprocessor"
schedule:
interval: "weekly"
- package-ecosystem: "gomod"
directory: "/receiver/awsxrayreceiver"
schedule:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

## Unreleased

## 🚀 New components 🚀
- add initial docker stats receiver, without sourcing in top level components (#495)
- add initial jmx metrics extension structure, without sourcing in top level components (#740)
- `routing` processor for routing spans based on HTTP headers (#907)

## v0.9.0

Expand Down
74 changes: 40 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ contributors.
collector to take a limited sets of dependencies - so run `go mod` commands
as appropriate for your component.
- Implement the needed interface on your component by importing the appropriate
component from the core repo. Follow the pattern of existing components
regarding config and factory source files and tests.
component from the core repo. Follow the pattern of existing components
regarding config and factory source files and tests.
- Implement your component as appropriate. Provide end-to-end tests (or mock
backend/client as appropriate). Target is to get 80% or more of code
coverage.
Expand Down Expand Up @@ -139,38 +139,44 @@ Learn more about roles in the [community repository](https://github.com/open-tel

#### Exporters

| Exporter | Reviewer(s) |
| -------- | ----------- |
| alibabacloudlogserviceexporter | @shabicheng |
| awsxrayexporter | @kbrockhoff @anuraaga |
| azuremonitorexporter | @pcwiese |
| carbonexporter | @pjanotti |
| elasticexporter | @axw |
| honeycombexporter | @paulosman @lizthegrey |
| jaegerthrifthttpexporter | @jpkrohling @pavolloffay |
| kinesisexporter | @owais |
| newrelicexporter | @MrAlias |
| sapmexporter | @owais @dmitryax |
| sentryexporter | @AbhiPrasad |
| signalfxexporter | @pmcollins @asuresh4 |
| splunkhecexporter | @atoulme |
| stackdriverexporter | @nilebox @james-bebbington |
| Exporter | Reviewer(s) |
| ------------------------------ | -------------------------- |
| alibabacloudlogserviceexporter | @shabicheng |
| awsxrayexporter | @kbrockhoff @anuraaga |
| azuremonitorexporter | @pcwiese |
| carbonexporter | @pjanotti |
| elasticexporter | @axw |
| honeycombexporter | @paulosman @lizthegrey |
| jaegerthrifthttpexporter | @jpkrohling @pavolloffay |
| kinesisexporter | @owais |
| newrelicexporter | @MrAlias |
| sapmexporter | @owais @dmitryax |
| sentryexporter | @AbhiPrasad |
| signalfxexporter | @pmcollins @asuresh4 |
| splunkhecexporter | @atoulme |
| stackdriverexporter | @nilebox @james-bebbington |

#### Receivers

| Receiver | Reviewer(s) |
| -------- | ----------- |
| awsecscontainermetricsreceiver | @kbrockhoff @anuraaga |
| awsxrayreceiver | @kbrockhoff @anuraaga |
| carbonreceiver | @pjanotti |
| collectdreceiver | @owais |
| k8sclusterreceiver | @asuresh4 |
| kubeletstatsreceiver | @pmcollins @asuresh4 |
| prometheusexecreceiver | @keitwb @james-bebbington |
| receivercreator | @jrcamp |
| redisreceiver | @pmcollins @jrcamp |
| sapmreceiver | @owais |
| signalfxreceiver | @pjanotti @asuresh4 |
| simpleprometheusreceiver | @asuresh4 |
| statsdreceiver | @keitwb @jmacd |
| wavefrontreceiver | @pjanotti |
| Receiver | Reviewer(s) |
| ------------------------------ | ------------------------- |
| awsecscontainermetricsreceiver | @kbrockhoff @anuraaga |
| awsxrayreceiver | @kbrockhoff @anuraaga |
| carbonreceiver | @pjanotti |
| collectdreceiver | @owais |
| k8sclusterreceiver | @asuresh4 |
| kubeletstatsreceiver | @pmcollins @asuresh4 |
| prometheusexecreceiver | @keitwb @james-bebbington |
| receivercreator | @jrcamp |
| redisreceiver | @pmcollins @jrcamp |
| sapmreceiver | @owais |
| signalfxreceiver | @pjanotti @asuresh4 |
| simpleprometheusreceiver | @asuresh4 |
| statsdreceiver | @keitwb @jmacd |
| wavefrontreceiver | @pjanotti |

#### Processors

| Processor | Reviewer(s) |
| ------------------------------ | ------------------------- |
| routing | @jpkrohling |
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/collectdreceiver"
Expand Down Expand Up @@ -127,6 +128,7 @@ func components() (component.Factories, error) {
k8sprocessor.NewFactory(),
resourcedetectionprocessor.NewFactory(),
metricstransformprocessor.NewFactory(),
routingprocessor.NewFactory(),
}
for _, pr := range factories.Processors {
processors = append(processors, pr)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sp
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor => ./processor/resourcedetectionprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor => ./processor/metricstransformprocessor/

replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor => ./processor/routingprocessor/
1 change: 1 addition & 0 deletions processor/routingprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
39 changes: 39 additions & 0 deletions processor/routingprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Routing processor

Routes traces to specific exporters.

This processor will read a header from the incoming HTTP request (gRPC or plain HTTP) and direct the trace information to specific exporters based on the attribute's value.

This processor *does not* let traces to continue through the pipeline and will emit a warning in case other processor(s) are defined after this one. Similarly, exporters defined as part of the pipeline are not authoritative: if you add an exporter to the pipeline, make sure you add it to this processor *as well*, otherwise it won't be used at all. All exporters defined as part of this processor *must also* be defined as part of the pipeline's exporters.

Given that this processor depends on information provided by the client via HTTP headers, processors that aggregate data like `batch` or `groupbytrace` should not be used when this processor is part of the pipeline.

The following settings are required:

- `from_attribute`: contains the HTTP header name to look up the route's value. Only the OTLP exporter has been tested in connection with the OTLP gRPC Receiver, but any other gRPC receiver should work fine, as long as the client sends the specified HTTP header.
- `table`: the routing table for this processor.
- `table.value`: a possible value for the attribute specified under FromAttribute.
- `table.exporters`: the list of exporters to use when the value from the FromAttribute field matches this table item.

The following settings can be optionally configured:

- `default_exporters` contains the list of exporters to use when a more specific record can't be found in the routing table.

Example:

```yaml
processors:
routing:
from_attribute: X-Tenant
default_exporters: jaeger
table:
- value: acme
exporters: [jaeger/acme]
exporters:
jaeger:
endpoint: localhost:14250
jaeger/acme:
endpoint: localhost:24250
```

The full list of settings exposed for this processor are documented [here](./config.go) with detailed sample configuration [here](./testdata/config.yaml).
52 changes: 52 additions & 0 deletions processor/routingprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routingprocessor

import (
"go.opentelemetry.io/collector/config/configmodels"
)

// Config defines configuration for the Routing processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// DefaultExporters contains the list of exporters to use when a more specific record can't be found in the routing table.
// Optional.
DefaultExporters []string `mapstructure:"default_exporters"`

// FromAttribute contains the attribute name to look up the route value. This attribute should be part of the context propagated
// down from the previous receivers and/or processors. If all the receivers and processors are propagating the entire context correctly,
// this could be the HTTP/gRPC header from the original request/RPC. Typically, aggregation processors (batch, queued_retry, groupbytrace)
// will create a new context, so, those should be avoided when using this processor.Although the HTTP spec allows headers to be repeated,
// this processor will only use the first value.
// Required.
FromAttribute string `mapstructure:"from_attribute"`

// Table contains the routing table for this processor.
// Required.
Table []RoutingTableItem `mapstructure:"table"`
}

// RoutingTableItem specifies how data should be routed to the different exporters
type RoutingTableItem struct {
// Value represents a possible value for the field specified under FromAttribute. Required.
Value string `mapstructure:"value"`

// Exporters contains the list of exporters to use when the value from the FromAttribute field matches this table item.
// When no exporters are specified, the ones specified under DefaultExporters are used, if any.
// The routing processor will fail upon the first failure from these exporters.
// Optional.
Exporters []string `mapstructure:"exporters"`
}
66 changes: 66 additions & 0 deletions processor/routingprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routingprocessor

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/jaegerexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory

// we don't need to use them in this test, but the config has them
factories.Exporters["otlp"] = otlpexporter.NewFactory()
factories.Exporters["jaeger"] = jaegerexporter.NewFactory()

cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

parsed := cfg.Processors["routing"]
assert.Equal(t, parsed,
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
NameVal: "routing",
TypeVal: "routing",
},
DefaultExporters: []string{"otlp"},
FromAttribute: "X-Tenant",
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"jaeger/acme", "otlp/acme"},
},
{
Value: "globex",
Exporters: []string{"otlp/globex"},
},
},
})
}
50 changes: 50 additions & 0 deletions processor/routingprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routingprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
// The value of "type" key in configuration.
typeStr = "routing"
)

// NewFactory creates a factory for the routing processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
)
}

func createDefaultConfig() configmodels.Processor {
return &Config{}
}

func createTraceProcessor(_ context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.TraceConsumer) (component.TraceProcessor, error) {
_, ok := nextConsumer.(component.Processor)
if ok {
params.Logger.Warn("another processor has been defined after the routing processor: it will NOT receive any data!")
}
return newProcessor(params.Logger, cfg)
}
Loading

0 comments on commit 1b925a1

Please sign in to comment.