Skip to content

Commit

Permalink
[routingconnector] convert routing processor to a connector (open-tel…
Browse files Browse the repository at this point in the history
…emetry#21498)

**Description:** 

This PR introduces a connector version of the routing processor. It
currently supports routing based on resource attributes OTTL statements
only. Context based routing will be added later in a followup PR.

There are two issues regarding fanout consumers that are being addressed
in the core repo.
* The routing connector needs to be a consumer in multiple pipelines
(the routing processor can route to a single exporter).
* Will be resolved by:
open-telemetry/opentelemetry-collector#7840
* We need the ability to create connector routers to facilitate testing.
I had to temporarily copy the fanoutconsumer package into the routing
connector codebase due to package visibility issues.
* Will be resolved by:
open-telemetry/opentelemetry-collector#7673

We can address these issues as the relevant PRs land on the core repo.

cc: @djaglowski, @jpkrohling, @kovrus 

**Link to tracking Issue:** 
open-telemetry#19738

**Testing:** 
Unit / manual

**Documentation:**
The readme has been ported from the routing processor

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
Co-authored-by: Ruslan Kovalov <[email protected]>
  • Loading branch information
3 people committed Jul 6, 2023
1 parent afb5b9b commit d7615dd
Show file tree
Hide file tree
Showing 32 changed files with 3,916 additions and 6 deletions.
16 changes: 16 additions & 0 deletions .chloggen/routing_connector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: A connector version of the routingprocessor

# One or more tracking issues related to the change
issues: [19738]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ cmd/telemetrygen/ @open-telemetry/collect
confmap/provider/s3provider/ @open-telemetry/collector-contrib-approvers @Aneurysm9

connector/countconnector/ @open-telemetry/collector-contrib-approvers @djaglowski @jpkrohling
connector/routingconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @kovrus @mwear
connector/servicegraphconnector/ @open-telemetry/collector-contrib-approvers @jpkrohling @mapno
connector/spanmetricsconnector/ @open-telemetry/collector-contrib-approvers @albertteoh @kovrus

Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ body:
- cmd/telemetrygen
- confmap/provider/s3provider
- connector/count
- connector/routing
- connector/servicegraph
- connector/spanmetrics
- examples/demo
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ body:
- cmd/telemetrygen
- confmap/provider/s3provider
- connector/count
- connector/routing
- connector/servicegraph
- connector/spanmetrics
- examples/demo
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ body:
- cmd/telemetrygen
- confmap/provider/s3provider
- connector/count
- connector/routing
- connector/servicegraph
- connector/spanmetrics
- examples/demo
Expand Down
10 changes: 5 additions & 5 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ updates:
schedule:
interval: "weekly"
day: "wednesday"
- package-ecosystem: "gomod"
directory: "/connector/routingconnector"
schedule:
interval: "weekly"
day: "wednesday"
- package-ecosystem: "gomod"
directory: "/connector/servicegraphconnector"
schedule:
Expand Down Expand Up @@ -1097,8 +1102,3 @@ updates:
schedule:
interval: "weekly"
day: "wednesday"
- package-ecosystem: "gomod"
directory: "/receiver/syslogreceiver"
schedule:
interval: "weekly"
day: "wednesday"
14 changes: 13 additions & 1 deletion cmd/mdatagen/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,19 @@ func (s *Status) validateStability() error {
errs = multierr.Append(errs, fmt.Errorf("missing component for stability: %v", stability))
}
for _, c := range component {
if c != "metrics" && c != "traces" && c != "logs" && c != "traces_to_metrics" && c != "metrics_to_metrics" && c != "logs_to_metrics" && c != "extension" {
if c != "metrics" &&
c != "traces" &&
c != "logs" &&
c != "traces_to_traces" &&
c != "traces_to_metrics" &&
c != "traces_to_logs" &&
c != "metrics_to_traces" &&
c != "metrics_to_metrics" &&
c != "metrics_to_logs" &&
c != "logs_to_traces" &&
c != "logs_to_metrics" &&
c != "logs_to_logs" &&
c != "extension" {
errs = multierr.Append(errs, fmt.Errorf("invalid component: %v", c))
}
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ receivers:
connectors:
- gomod: go.opentelemetry.io/collector/connector/forwardconnector v0.81.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.81.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector v0.81.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector v0.81.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.81.0

Expand Down Expand Up @@ -411,6 +412,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector => ../../connector/countconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector => ../../connector/routingconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector => ../../connector/servicegraphconnector
- github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector => ../../connector/spanmetricsconnector
- github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go 1.19

require (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.81.0
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/alibabacloudlogserviceexporter v0.81.0
Expand Down Expand Up @@ -1108,6 +1109,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/countconnector => ../../connector/countconnector

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector => ../../connector/routingconnector

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector => ../../connector/servicegraphconnector

replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector => ../../connector/spanmetricsconnector
Expand Down
1 change: 1 addition & 0 deletions connector/routingconnector/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
105 changes: 105 additions & 0 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Routing Connector

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Distributions | [contrib] |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib

## Supported Pipeline Types

| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] |
| ------------------------ | ------------------------ | ----------------- |
| traces | traces | [development] |
| metrics | metrics | [development] |
| logs | logs | [development] |

[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels
<!-- end autogenerated section -->

Routes logs, metrics or traces based on resource attributes to specific pipelines using [OpenTelemetry Transformation Language (OTTL)](../../pkg/ottl/README.md) statements as routing conditions.

## Configuration

If you are not already familiar with connectors, you may find it helpful to first visit the [Connectors README].

The following settings are available:

- `table (required)`: the routing table for this connector.
- `table.statement (required)`: the routing condition provided as the [OTTL] statement.
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions.
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `ignore` and `propagate`. If `ignored` is used and a statement's condition has an error then the payload will be routed to the default pipelines. If not supplied, `propagate` is used.

Example:

```yaml
receivers:
otlp:

exporters:
jaeger:
endpoint: localhost:14250
jaeger/acme:
endpoint: localhost:24250
jaeger/ecorp:
endpoint: localhost:34250

connectors:
routing:
default_pipelines: [traces/jaeger]
error_mode: ignore
table:
- statement: route() where attributes["X-Tenant"] == "acme"
pipelines: [traces/jaeger-acme]
- statement: delete_key(attributes, "X-Tenant") where IsMatch(attributes["X-Tenant"], ".*corp")
pipelines: [traces/jaeger-ecorp]

service:
pipelines:
traces/in:
receivers: [oltp]
exporters: [routing]
traces/jaeger:
receivers: [routing]
exporters: [jaeger]
traces/jaeger-acme:
receivers: [routing]
exporters: [jaeger/acme]
traces/jaeger-ecorp:
receivers: [routing]
exporters: [jaeger/ecorp]
```

A signal may get matched by routing conditions of more than one routing table entry. In this case, the signal will be routed to all pipelines of matching routes.
Respectively, if none of the routing conditions met, then a signal is routed to default pipelines.

## Differences between the Routing Connector and Routing Processor

- The connector will only route using [OTTL] statements which can only be applied to resource attributes. It does not support matching on context values at this time.
- The connector routes to pipelines, not exporters as the processor does.

### OTTL Limitations
- Currently, it is not possible to specify boolean statements without function invocation as the routing condition. It is required to provide the NOOP `route()` or any other supported function as part of the routing statement, see [#13545](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13545) for more information.
- Supported [OTTL] functions:
- [IsMatch](../../pkg/ottl/ottlfuncs/README.md#IsMatch)
- [delete_key](../../pkg/ottl/ottlfuncs/README.md#delete_key)
- [delete_matching_keys](../../pkg/ottl/ottlfuncs/README.md#delete_matching_keys)

## Additional Settings
The full list of settings exposed for this connector are documented [here](./config.go) with detailed sample configuration files:

- [logs](./testdata/config_logs.yaml)
- [metrics](./testdata/config_metrics.yaml)
- [traces](./testdata/config_traces.yaml)

[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development
[Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md
[Exporter Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
[Receiver Pipeline Type]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/processing.md#telemetry-query-language
78 changes: 78 additions & 0 deletions connector/routingconnector/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package routingconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector"

import (
"errors"

"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
)

var (
errEmptyRoute = errors.New("invalid route: no statement provided")
errNoPipelines = errors.New("invalid route: no pipelines defined")
errUnexpectedConsumer = errors.New("expected consumer to be a connector router")
errNoTableItems = errors.New("invalid routing table: the routing table is empty")
)

// Config defines configuration for the Routing processor.
type Config struct {
// DefaultPipelines contains the list of pipelines to use when a more specific record can't be
// found in the routing table.
// Optional.
DefaultPipelines []component.ID `mapstructure:"default_pipelines"`

// ErrorMode determines how the processor reacts to errors that occur while processing an OTTL
// condition.
// Valid values are `ignore` and `propagate`.
// `ignore` means the processor ignores errors returned by conditions and continues on to the
// next condition. This is the recommended mode. If `ignored` is used and a statement's
// condition has an error then the payload will be routed to the default exporter. `propagate`
// means the processor returns the error up the pipeline. This will result in the payload being
// dropped from the collector.
// The default value is `propagate`.
ErrorMode ottl.ErrorMode `mapstructure:"error_mode"`

// Table contains the routing table for this processor.
// Required.
Table []RoutingTableItem `mapstructure:"table"`
}

// Validate checks if the processor configuration is valid.
func (c *Config) Validate() error {
// validate that there's at least one item in the table
if len(c.Table) == 0 {
return errNoTableItems
}

// validate that every route has a value for the routing attribute and has
// at least one pipeline
for _, item := range c.Table {
if len(item.Statement) == 0 {
return errEmptyRoute
}

if len(item.Pipelines) == 0 {
return errNoPipelines
}
}

return nil
}

// RoutingTableItem specifies how data should be routed to the different pipelines
type RoutingTableItem struct {
// Statement is a OTTL statement used for making a routing decision.
// Required when 'Value' isn't provided.
Statement string `mapstructure:"statement"`

// Pipelines contains the list of pipelines to use when the value from the FromAttribute field
// matches this table item. When no pipelines are specified, the ones specified under
// DefaultPipelines are used, if any.
// The routing processor will fail upon the first failure from these pipelines.
// Optional.
Pipelines []component.ID `mapstructure:"pipelines"`
}
Loading

0 comments on commit d7615dd

Please sign in to comment.