diff --git a/.chloggen/feat_routing_match_once.yaml b/.chloggen/feat_routing_match_once.yaml new file mode 100755 index 0000000000000..66332284835fa --- /dev/null +++ b/.chloggen/feat_routing_match_once.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: routingconnector supports matching the statement only once + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26353] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/connector/routingconnector/README.md b/connector/routingconnector/README.md index d1dfef73ab025..ff9e5b8bb416e 100644 --- a/connector/routingconnector/README.md +++ b/connector/routingconnector/README.md @@ -36,6 +36,7 @@ The following settings are available: - `table.pipelines (required)`: the list of pipelines to use when the routing condition is met. - `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions. - `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `propagate`, `ignore` and `silent`. If `ignored` or `silent` is used and a statement's condition has an error then the payload will be routed to the default pipelines. When `silent` is used the error is not logged. If not supplied, `propagate` is used. +- `match_once (optional, default: false)`: determines whether the connector matches multiple statements or not. If enabled, the payload will be routed to the first pipeline in the `table` whose routing condition is met. Example: @@ -55,12 +56,23 @@ connectors: routing: default_pipelines: [traces/jaeger] error_mode: ignore + match_once: false table: - statement: route() where attributes["X-Tenant"] == "acme" pipelines: [traces/jaeger-acme] - statement: delete_key(attributes, "X-Tenant") where IsMatch(attributes["X-Tenant"], ".*corp") pipelines: [traces/jaeger-ecorp] + routing/match_once: + default_pipelines: [traces/jaeger] + error_mode: ignore + match_once: true + table: + - statement: route() where attributes["X-Tenant"] == "acme" + pipelines: [traces/jaeger-acme] + - statement: route() where attributes["X-Tenant"] == ".*acme" + pipelines: [traces/jaeger-ecorp] + service: pipelines: traces/in: diff --git a/connector/routingconnector/config.go b/connector/routingconnector/config.go index 58c5102dff5c6..11abda52b2172 100644 --- a/connector/routingconnector/config.go +++ b/connector/routingconnector/config.go @@ -39,6 +39,10 @@ type Config struct { // Table contains the routing table for this processor. // Required. Table []RoutingTableItem `mapstructure:"table"` + + // MatchOnce determines whether the connector matches multiple statements. + // Optional. + MatchOnce bool `mapstructure:"match_once"` } // Validate checks if the processor configuration is valid. diff --git a/connector/routingconnector/logs.go b/connector/routingconnector/logs.go index e1db011eefda1..eb58175a25e96 100644 --- a/connector/routingconnector/logs.go +++ b/connector/routingconnector/logs.go @@ -72,7 +72,7 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { rtx := ottlresource.NewTransformContext(rlogs.Resource()) noRoutesMatch := true - for _, route := range c.router.routes { + for _, route := range c.router.routeSlice { _, isMatch, err := route.statement.Execute(ctx, rtx) if err != nil { if c.config.ErrorMode == ottl.PropagateError { @@ -84,6 +84,9 @@ func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { if isMatch { noRoutesMatch = false c.group(groups, route.consumer, rlogs) + if c.config.MatchOnce { + break + } } } diff --git a/connector/routingconnector/logs_test.go b/connector/routingconnector/logs_test.go index ed2c8dcc0cba5..b68ec958fca0d 100644 --- a/connector/routingconnector/logs_test.go +++ b/connector/routingconnector/logs_test.go @@ -229,6 +229,158 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { }) } +func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) { + logsDefault := component.NewIDWithName(component.DataTypeLogs, "default") + logs0 := component.NewIDWithName(component.DataTypeLogs, "0") + logs1 := component.NewIDWithName(component.DataTypeLogs, "1") + + cfg := &Config{ + DefaultPipelines: []component.ID{logsDefault}, + Table: []RoutingTableItem{ + { + Statement: `route() where IsMatch(attributes["X-Tenant"], ".*acme") == true`, + Pipelines: []component.ID{logs0}, + }, + { + Statement: `route() where IsMatch(attributes["X-Tenant"], "_acme") == true`, + Pipelines: []component.ID{logs1}, + }, + { + Statement: `route() where attributes["X-Tenant"] == "ecorp"`, + Pipelines: []component.ID{logsDefault, logs0}, + }, + }, + MatchOnce: true, + } + + var defaultSink, sink0, sink1 consumertest.LogsSink + + router := connectortest.NewLogsRouter( + connectortest.WithLogsSink(logsDefault, &defaultSink), + connectortest.WithLogsSink(logs0, &sink0), + connectortest.WithLogsSink(logs1, &sink1), + ) + + resetSinks := func() { + defaultSink.Reset() + sink0.Reset() + sink1.Reset() + } + + factory := NewFactory() + conn, err := factory.CreateLogsToLogs( + context.Background(), + connectortest.NewNopCreateSettings(), + cfg, + router.(consumer.Logs), + ) + + require.NoError(t, err) + require.NotNil(t, conn) + require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(t, conn.Shutdown(context.Background())) + }() + + t.Run("logs matched by no expressions", func(t *testing.T) { + resetSinks() + + l := plog.NewLogs() + rl := l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "something-else") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + require.NoError(t, conn.ConsumeLogs(context.Background(), l)) + + assert.Len(t, defaultSink.AllLogs(), 1) + assert.Len(t, sink0.AllLogs(), 0) + assert.Len(t, sink1.AllLogs(), 0) + }) + + t.Run("logs matched one expression", func(t *testing.T) { + resetSinks() + + l := plog.NewLogs() + + rl := l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "xacme") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + require.NoError(t, conn.ConsumeLogs(context.Background(), l)) + + assert.Len(t, defaultSink.AllLogs(), 0) + assert.Len(t, sink0.AllLogs(), 1) + assert.Len(t, sink1.AllLogs(), 0) + }) + + t.Run("logs matched by two expressions, but sinks to one", func(t *testing.T) { + resetSinks() + + l := plog.NewLogs() + + rl := l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "x_acme") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + rl = l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "_acme") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + require.NoError(t, conn.ConsumeLogs(context.Background(), l)) + + assert.Len(t, defaultSink.AllLogs(), 0) + assert.Len(t, sink0.AllLogs(), 1) + assert.Len(t, sink1.AllLogs(), 0) + + assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 2) + }) + + t.Run("one log matched by multiple expressions, other matched none", func(t *testing.T) { + resetSinks() + + l := plog.NewLogs() + + rl := l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "_acme") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + rl = l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "something-else") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + require.NoError(t, conn.ConsumeLogs(context.Background(), l)) + + assert.Len(t, defaultSink.AllLogs(), 1) + assert.Len(t, sink0.AllLogs(), 1) + assert.Len(t, sink1.AllLogs(), 0) + + rlog := defaultSink.AllLogs()[0].ResourceLogs().At(0) + attr, ok := rlog.Resource().Attributes().Get("X-Tenant") + assert.True(t, ok, "routing attribute must exists") + assert.Equal(t, attr.AsString(), "something-else") + }) + + t.Run("logs matched by one expression, multiple pipelines", func(t *testing.T) { + resetSinks() + + l := plog.NewLogs() + + rl := l.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("X-Tenant", "ecorp") + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + + require.NoError(t, conn.ConsumeLogs(context.Background(), l)) + + assert.Len(t, defaultSink.AllLogs(), 1) + assert.Len(t, sink0.AllLogs(), 1) + assert.Len(t, sink1.AllLogs(), 0) + + assert.Equal(t, defaultSink.AllLogs()[0].LogRecordCount(), 1) + assert.Equal(t, sink0.AllLogs()[0].LogRecordCount(), 1) + assert.Equal(t, defaultSink.AllLogs(), sink0.AllLogs()) + }) +} + func TestLogsResourceAttributeDroppedByOTTL(t *testing.T) { logsDefault := component.NewIDWithName(component.DataTypeLogs, "default") logsOther := component.NewIDWithName(component.DataTypeLogs, "other") diff --git a/connector/routingconnector/metrics.go b/connector/routingconnector/metrics.go index c0c7fdad5e7e7..5c186f424f9a4 100644 --- a/connector/routingconnector/metrics.go +++ b/connector/routingconnector/metrics.go @@ -72,7 +72,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric rtx := ottlresource.NewTransformContext(rmetrics.Resource()) noRoutesMatch := true - for _, route := range c.router.routes { + for _, route := range c.router.routeSlice { _, isMatch, err := route.statement.Execute(ctx, rtx) if err != nil { if c.config.ErrorMode == ottl.PropagateError { @@ -84,6 +84,9 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric if isMatch { noRoutesMatch = false c.group(groups, route.consumer, rmetrics) + if c.config.MatchOnce { + break + } } } diff --git a/connector/routingconnector/metrics_test.go b/connector/routingconnector/metrics_test.go index 377d966865f84..02f732401efa0 100644 --- a/connector/routingconnector/metrics_test.go +++ b/connector/routingconnector/metrics_test.go @@ -244,6 +244,173 @@ func TestMetricsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { }) } +func TestMetricsAreCorrectlyMatchOnceWithOTTL(t *testing.T) { + metricsDefault := component.NewIDWithName(component.DataTypeMetrics, "default") + metrics0 := component.NewIDWithName(component.DataTypeMetrics, "0") + metrics1 := component.NewIDWithName(component.DataTypeMetrics, "1") + + cfg := &Config{ + DefaultPipelines: []component.ID{metricsDefault}, + Table: []RoutingTableItem{ + { + Statement: `route() where attributes["value"] > 2.5`, + Pipelines: []component.ID{metrics0}, + }, + { + Statement: `route() where attributes["value"] > 3.0`, + Pipelines: []component.ID{metrics1}, + }, + { + Statement: `route() where attributes["value"] == 1.0`, + Pipelines: []component.ID{metricsDefault, metrics0}, + }, + }, + MatchOnce: true, + } + + var defaultSink, sink0, sink1 consumertest.MetricsSink + + router := connectortest.NewMetricsRouter( + connectortest.WithMetricsSink(metricsDefault, &defaultSink), + connectortest.WithMetricsSink(metrics0, &sink0), + connectortest.WithMetricsSink(metrics1, &sink1), + ) + + resetSinks := func() { + defaultSink.Reset() + sink0.Reset() + sink1.Reset() + } + + factory := NewFactory() + conn, err := factory.CreateMetricsToMetrics( + context.Background(), + connectortest.NewNopCreateSettings(), + cfg, + router.(consumer.Metrics), + ) + + require.NoError(t, err) + require.NotNil(t, conn) + require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(t, conn.Shutdown(context.Background())) + }() + + t.Run("metric matched by no expressions", func(t *testing.T) { + resetSinks() + + m := pmetric.NewMetrics() + + rm := m.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutDouble("value", 0.0) + metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("cpu") + + require.NoError(t, conn.ConsumeMetrics(context.Background(), m)) + + assert.Len(t, defaultSink.AllMetrics(), 1) + assert.Len(t, sink0.AllMetrics(), 0) + assert.Len(t, sink1.AllMetrics(), 0) + }) + + t.Run("metric matched by one of two expressions", func(t *testing.T) { + resetSinks() + + m := pmetric.NewMetrics() + + rm := m.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutDouble("value", 2.7) + metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("cpu") + + require.NoError(t, conn.ConsumeMetrics(context.Background(), m)) + + assert.Len(t, defaultSink.AllMetrics(), 0) + assert.Len(t, sink0.AllMetrics(), 1) + assert.Len(t, sink1.AllMetrics(), 0) + }) + + t.Run("metric matched by two expressions, but sinks to one", func(t *testing.T) { + resetSinks() + + m := pmetric.NewMetrics() + + rm := m.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutDouble("value", 5.0) + metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("cpu") + + rm = m.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutDouble("value", 3.1) + metric = rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("cpu1") + + require.NoError(t, conn.ConsumeMetrics(context.Background(), m)) + + assert.Len(t, defaultSink.AllMetrics(), 0) + assert.Len(t, sink0.AllMetrics(), 1) + assert.Len(t, sink1.AllMetrics(), 0) + + assert.Equal(t, sink0.AllMetrics()[0].MetricCount(), 2) + }) + + t.Run("one metric matched by 2 expressions, others matched by none", func(t *testing.T) { + resetSinks() + + m := pmetric.NewMetrics() + + rm := m.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutDouble("value", 5.0) + metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("cpu") + + rm = m.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutDouble("value", -1.0) + metric = rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("cpu1") + + require.NoError(t, conn.ConsumeMetrics(context.Background(), m)) + + assert.Len(t, defaultSink.AllMetrics(), 1) + assert.Len(t, sink0.AllMetrics(), 1) + assert.Len(t, sink1.AllMetrics(), 0) + + rmetric := defaultSink.AllMetrics()[0].ResourceMetrics().At(0) + attr, ok := rmetric.Resource().Attributes().Get("value") + assert.True(t, ok, "routing attribute must exist") + assert.Equal(t, attr.Double(), float64(-1.0)) + }) + + t.Run("metric matched by one expression, multiple pipelines", func(t *testing.T) { + resetSinks() + + m := pmetric.NewMetrics() + + rm := m.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutDouble("value", 1.0) + metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetEmptyGauge() + metric.SetName("cpu") + + require.NoError(t, conn.ConsumeMetrics(context.Background(), m)) + + assert.Len(t, defaultSink.AllMetrics(), 1) + assert.Len(t, sink0.AllMetrics(), 1) + assert.Len(t, sink1.AllMetrics(), 0) + + assert.Equal(t, defaultSink.AllMetrics()[0].MetricCount(), 1) + assert.Equal(t, sink0.AllMetrics()[0].MetricCount(), 1) + assert.Equal(t, defaultSink.AllMetrics(), sink0.AllMetrics()) + }) +} + func TestMetricsResourceAttributeDroppedByOTTL(t *testing.T) { metricsDefault := component.NewIDWithName(component.DataTypeMetrics, "default") metricsOther := component.NewIDWithName(component.DataTypeMetrics, "other") diff --git a/connector/routingconnector/router.go b/connector/routingconnector/router.go index 65a4b639d1d61..53558c6ff71a2 100644 --- a/connector/routingconnector/router.go +++ b/connector/routingconnector/router.go @@ -29,8 +29,9 @@ type router[C any] struct { logger *zap.Logger parser ottl.Parser[ottlresource.TransformContext] - table []RoutingTableItem - routes map[string]routingItem[C] + table []RoutingTableItem + routes map[string]routingItem[C] + routeSlice []routingItem[C] defaultConsumer C consumerProvider consumerProvider[C] @@ -125,6 +126,9 @@ func (r *router[C]) registerRouteConsumers() error { return fmt.Errorf("%w: %s", errPipelineNotFound, err.Error()) } route.consumer = consumer + if !ok { + r.routeSlice = append(r.routeSlice, route) + } r.routes[key(item)] = route } diff --git a/connector/routingconnector/traces.go b/connector/routingconnector/traces.go index 99397d4482ab5..73735c75a2750 100644 --- a/connector/routingconnector/traces.go +++ b/connector/routingconnector/traces.go @@ -71,7 +71,7 @@ func (c *tracesConnector) ConsumeTraces(ctx context.Context, t ptrace.Traces) er rtx := ottlresource.NewTransformContext(rspans.Resource()) noRoutesMatch := true - for _, route := range c.router.routes { + for _, route := range c.router.routeSlice { _, isMatch, err := route.statement.Execute(ctx, rtx) if err != nil { if c.config.ErrorMode == ottl.PropagateError { @@ -83,6 +83,9 @@ func (c *tracesConnector) ConsumeTraces(ctx context.Context, t ptrace.Traces) er if isMatch { noRoutesMatch = false c.group(groups, route.consumer, rspans) + if c.config.MatchOnce { + break + } } } diff --git a/connector/routingconnector/traces_test.go b/connector/routingconnector/traces_test.go index f45c6fcb7c510..20397cedeb451 100644 --- a/connector/routingconnector/traces_test.go +++ b/connector/routingconnector/traces_test.go @@ -204,6 +204,135 @@ func TestTracesCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { }) } +func TestTracesCorrectlyMatchOnceWithOTTL(t *testing.T) { + tracesDefault := component.NewIDWithName(component.DataTypeTraces, "default") + traces0 := component.NewIDWithName(component.DataTypeTraces, "0") + traces1 := component.NewIDWithName(component.DataTypeTraces, "1") + + cfg := &Config{ + DefaultPipelines: []component.ID{tracesDefault}, + MatchOnce: true, + Table: []RoutingTableItem{ + { + Statement: `route() where attributes["value"] > 0 and attributes["value"] < 4`, + Pipelines: []component.ID{traces0}, + }, + { + Statement: `route() where attributes["value"] > 1 and attributes["value"] < 4`, + Pipelines: []component.ID{traces1}, + }, + { + Statement: `route() where attributes["value"] == 5`, + Pipelines: []component.ID{tracesDefault, traces0}, + }, + }, + } + + var defaultSink, sink0, sink1 consumertest.TracesSink + + resetSinks := func() { + defaultSink.Reset() + sink0.Reset() + sink1.Reset() + } + + router := connectortest.NewTracesRouter( + connectortest.WithTracesSink(tracesDefault, &defaultSink), + connectortest.WithTracesSink(traces0, &sink0), + connectortest.WithTracesSink(traces1, &sink1), + ) + + factory := NewFactory() + conn, err := factory.CreateTracesToTraces( + context.Background(), + connectortest.NewNopCreateSettings(), + cfg, + router.(consumer.Traces), + ) + + require.NoError(t, err) + require.NotNil(t, conn) + require.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + assert.NoError(t, conn.Shutdown(context.Background())) + }() + + t.Run("span matched by 0 expressions", func(t *testing.T) { + resetSinks() + + tr := ptrace.NewTraces() + rl := tr.ResourceSpans().AppendEmpty() + rl.Resource().Attributes().PutInt("value", 10) + span := rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("span") + + require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) + + assert.Len(t, defaultSink.AllTraces(), 1) + assert.Len(t, sink0.AllTraces(), 0) + assert.Len(t, sink1.AllTraces(), 0) + }) + + t.Run("span matched by one of two expressions", func(t *testing.T) { + resetSinks() + + tr := ptrace.NewTraces() + rl := tr.ResourceSpans().AppendEmpty() + rl.Resource().Attributes().PutInt("value", 1) + span := rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("span") + + require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) + + assert.Len(t, defaultSink.AllTraces(), 0) + assert.Len(t, sink0.AllTraces(), 1) + assert.Len(t, sink1.AllTraces(), 0) + }) + + t.Run("span matched by all expressions, but sinks to one", func(t *testing.T) { + resetSinks() + + tr := ptrace.NewTraces() + rl := tr.ResourceSpans().AppendEmpty() + rl.Resource().Attributes().PutInt("value", 2) + span := rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("span") + + rl = tr.ResourceSpans().AppendEmpty() + rl.Resource().Attributes().PutInt("value", 3) + span = rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("span1") + + require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) + + assert.Len(t, defaultSink.AllTraces(), 0) + assert.Len(t, sink0.AllTraces(), 1) + assert.Len(t, sink1.AllTraces(), 0) + + assert.Equal(t, sink0.AllTraces()[0].SpanCount(), 2) + }) + + t.Run("span matched by one expression, multiple pipelines", func(t *testing.T) { + resetSinks() + + tr := ptrace.NewTraces() + rl := tr.ResourceSpans().AppendEmpty() + rl.Resource().Attributes().PutInt("value", 5) + span := rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("span") + + require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) + + assert.Len(t, defaultSink.AllTraces(), 1) + assert.Len(t, sink0.AllTraces(), 1) + assert.Len(t, sink1.AllTraces(), 0) + + assert.Equal(t, defaultSink.AllTraces()[0].SpanCount(), 1) + assert.Equal(t, sink0.AllTraces()[0].SpanCount(), 1) + assert.Equal(t, defaultSink.AllTraces(), sink0.AllTraces()) + }) +} + func TestTracesResourceAttributeDroppedByOTTL(t *testing.T) { tracesDefault := component.NewIDWithName(component.DataTypeTraces, "default") tracesOther := component.NewIDWithName(component.DataTypeTraces, "other")