diff --git a/exporter/splunkhecexporter/batchperscope.go b/exporter/splunkhecexporter/batchperscope.go new file mode 100644 index 000000000000..5188200d9a9d --- /dev/null +++ b/exporter/splunkhecexporter/batchperscope.go @@ -0,0 +1,113 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" + +import ( + "context" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/multierr" +) + +// perScopeBatcher is a consumer.Logs that rebatches logs by a type found in the scope name: profiling or regular logs. +type perScopeBatcher struct { + logsEnabled bool + profilingEnabled bool + next consumer.Logs +} + +func (rb *perScopeBatcher) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (rb *perScopeBatcher) ConsumeLogs(ctx context.Context, logs plog.Logs) error { + var profilingFound bool + var otherLogsFound bool + + for i := 0; i < logs.ResourceLogs().Len(); i++ { + rs := logs.ResourceLogs().At(i) + for j := 0; j < rs.ScopeLogs().Len(); j++ { + if isProfilingData(rs.ScopeLogs().At(j)) { + profilingFound = true + } else { + otherLogsFound = true + } + } + if profilingFound && otherLogsFound { + break + } + } + + // if we don't have both types of logs, just call next if enabled + if !profilingFound || !otherLogsFound { + if rb.logsEnabled && otherLogsFound { + return rb.next.ConsumeLogs(ctx, logs) + } + if rb.profilingEnabled && profilingFound { + return rb.next.ConsumeLogs(ctx, logs) + } + return nil + } + + profilingLogs := plog.NewLogs() + otherLogs := plog.NewLogs() + + for i := 0; i < logs.ResourceLogs().Len(); i++ { + rs := logs.ResourceLogs().At(i) + profilingFound = false + otherLogsFound = false + for j := 0; j < rs.ScopeLogs().Len(); j++ { + sl := rs.ScopeLogs().At(j) + if isProfilingData(sl) { + profilingFound = true + } else { + otherLogsFound = true + } + } + switch { + case profilingFound && otherLogsFound: + if rb.profilingEnabled { + copyResourceLogs(rs, profilingLogs.ResourceLogs().AppendEmpty(), true) + } + if rb.logsEnabled { + copyResourceLogs(rs, otherLogs.ResourceLogs().AppendEmpty(), false) + } + case profilingFound && rb.profilingEnabled: + rs.CopyTo(profilingLogs.ResourceLogs().AppendEmpty()) + case otherLogsFound && rb.logsEnabled: + rs.CopyTo(otherLogs.ResourceLogs().AppendEmpty()) + } + } + + var err error + if rb.logsEnabled { + err = multierr.Append(err, rb.next.ConsumeLogs(ctx, otherLogs)) + } + if rb.profilingEnabled { + err = multierr.Append(err, rb.next.ConsumeLogs(ctx, profilingLogs)) + } + return err +} + +func copyResourceLogs(src plog.ResourceLogs, dest plog.ResourceLogs, isProfiling bool) { + src.Resource().CopyTo(dest.Resource()) + for j := 0; j < src.ScopeLogs().Len(); j++ { + sl := src.ScopeLogs().At(j) + if isProfilingData(sl) == isProfiling { + sl.CopyTo(dest.ScopeLogs().AppendEmpty()) + } + } +} diff --git a/exporter/splunkhecexporter/batchperscope_test.go b/exporter/splunkhecexporter/batchperscope_test.go new file mode 100644 index 000000000000..e138ae2b5c1e --- /dev/null +++ b/exporter/splunkhecexporter/batchperscope_test.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" +) + +func TestBatchLogs_ConsumeLogs(t *testing.T) { + tests := []struct { + name string + profilingEnabled bool + logsEnabled bool + in string + out []string + }{ + { + name: "profiling_only_both_enabled", + profilingEnabled: true, + logsEnabled: true, + in: "profiling_only.yaml", + out: []string{"profiling_only.yaml"}, + }, + { + name: "profiling_only_profiling_enabled", + profilingEnabled: true, + in: "profiling_only.yaml", + out: []string{"profiling_only.yaml"}, + }, + { + name: "profiling_only_profiling_disabled", + logsEnabled: true, + in: "profiling_only.yaml", + out: []string{}, + }, + { + name: "regular_logs_only_both_enabled", + profilingEnabled: true, + logsEnabled: true, + in: "regular_logs_only.yaml", + out: []string{"regular_logs_only.yaml"}, + }, + { + name: "regular_logs_only_logs_enabled", + logsEnabled: true, + in: "regular_logs_only.yaml", + out: []string{"regular_logs_only.yaml"}, + }, + { + name: "regular_logs_only_logs_disabled", + profilingEnabled: true, + in: "regular_logs_only.yaml", + out: []string{}, + }, + { + name: "combined_both_enabled", + profilingEnabled: true, + logsEnabled: true, + in: "combined.yaml", + out: []string{"regular_logs_only.yaml", "profiling_only.yaml"}, + }, + { + name: "combined_profiling_disabled", + logsEnabled: true, + in: "combined.yaml", + out: []string{"regular_logs_only.yaml"}, + }, + { + name: "combined_logs_disabled", + profilingEnabled: true, + in: "combined.yaml", + out: []string{"profiling_only.yaml"}, + }, + { + name: "combined_both_disabled", + in: "combined.yaml", + out: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := &consumertest.LogsSink{} + consumer := &perScopeBatcher{ + profilingEnabled: tt.profilingEnabled, + logsEnabled: tt.logsEnabled, + next: sink, + } + + logs, err := golden.ReadLogs("testdata/batchperscope/" + tt.in) + require.NoError(t, err) + + err = consumer.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + + assert.Equal(t, len(tt.out), len(sink.AllLogs())) + for i, out := range tt.out { + expected, err := golden.ReadLogs("testdata/batchperscope/" + out) + require.NoError(t, err) + assert.NoError(t, plogtest.CompareLogs(expected, sink.AllLogs()[i])) + } + }) + } + +} diff --git a/exporter/splunkhecexporter/go.mod b/exporter/splunkhecexporter/go.mod index 77c7df760ad3..afa813ce18b9 100644 --- a/exporter/splunkhecexporter/go.mod +++ b/exporter/splunkhecexporter/go.mod @@ -8,6 +8,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.77.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.77.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.77.0 github.com/stretchr/testify v1.8.2 go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.77.0 @@ -22,6 +23,7 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect @@ -37,6 +39,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.77.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.9.0 // indirect go.opentelemetry.io/collector/featuregate v0.77.0 // indirect diff --git a/exporter/splunkhecexporter/go.sum b/exporter/splunkhecexporter/go.sum index d79bc1946471..eee053099494 100644 --- a/exporter/splunkhecexporter/go.sum +++ b/exporter/splunkhecexporter/go.sum @@ -33,6 +33,7 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= diff --git a/exporter/splunkhecexporter/testdata/batchperscope/combined.yaml b/exporter/splunkhecexporter/testdata/batchperscope/combined.yaml new file mode 100644 index 000000000000..c41db108609c --- /dev/null +++ b/exporter/splunkhecexporter/testdata/batchperscope/combined.yaml @@ -0,0 +1,109 @@ +resourceLogs: + - resource: + attributes: + - key: resource1_attribute1 + value: + stringValue: value1 + - key: resource2_attribute2 + value: + stringValue: value2 + scopeLogs: + - scope: + name: otel.profiling + logRecords: + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log1_body + timeUnixNano: "11651379494838206464" + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log2_body + timeUnixNano: "11651379494838206472" + - scope: + name: otel_collector + version: 0.1.0 + logRecords: + - attributes: + - key: resource1_scope1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope1_body + timeUnixNano: "11651379494838207123" + - scope: + name: external_service + version: 0.2.0 + logRecords: + - attributes: + - key: resource1_scope2_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_log2_body + timeUnixNano: "11651379494838207153" + - resource: + attributes: + - key: resource2_attribute1 + value: + stringValue: value1 + scopeLogs: + - scope: + name: otel_collector + version: 0.1.0 + logRecords: + - attributes: + - key: resource2_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource2_log2_body + timeUnixNano: "11651379494838209123" + - scope: + name: otel.profiling + logRecords: + - attributes: + - key: resource2_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource2_prof_scope_log1_body + timeUnixNano: "11651379494838206532" + - resource: + attributes: + - key: resource3_attribute1 + value: + stringValue: value1 + scopeLogs: + - scope: + name: otel_collector + version: 0.1.0 + logRecords: + - attributes: + - key: resource3_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource3_log2_body + timeUnixNano: "11651379494838221123" + - resource: + attributes: + - key: resource4_attribute1 + value: + stringValue: value1 + scopeLogs: + - scope: + name: otel.profiling + logRecords: + - attributes: + - key: resource4_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource4_prof_scope_log1_body + timeUnixNano: "11651379494838214321" diff --git a/exporter/splunkhecexporter/testdata/batchperscope/profiling_only.yaml b/exporter/splunkhecexporter/testdata/batchperscope/profiling_only.yaml new file mode 100644 index 000000000000..a017eeba18e8 --- /dev/null +++ b/exporter/splunkhecexporter/testdata/batchperscope/profiling_only.yaml @@ -0,0 +1,59 @@ +resourceLogs: + - resource: + attributes: + - key: resource1_attribute1 + value: + stringValue: value1 + - key: resource2_attribute2 + value: + stringValue: value2 + scopeLogs: + - scope: + name: otel.profiling + logRecords: + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log1_body + timeUnixNano: "11651379494838206464" + - attributes: + - key: resource1_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_prof_scope_log2_body + timeUnixNano: "11651379494838206472" + - resource: + attributes: + - key: resource2_attribute1 + value: + stringValue: value1 + scopeLogs: + - scope: + name: otel.profiling + logRecords: + - attributes: + - key: resource2_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource2_prof_scope_log1_body + timeUnixNano: "11651379494838206532" + - resource: + attributes: + - key: resource4_attribute1 + value: + stringValue: value1 + scopeLogs: + - scope: + name: otel.profiling + logRecords: + - attributes: + - key: resource4_prof_scope_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource4_prof_scope_log1_body + timeUnixNano: "11651379494838214321" diff --git a/exporter/splunkhecexporter/testdata/batchperscope/regular_logs_only.yaml b/exporter/splunkhecexporter/testdata/batchperscope/regular_logs_only.yaml new file mode 100644 index 000000000000..787813c27364 --- /dev/null +++ b/exporter/splunkhecexporter/testdata/batchperscope/regular_logs_only.yaml @@ -0,0 +1,66 @@ +resourceLogs: + - resource: + attributes: + - key: resource1_attribute1 + value: + stringValue: value1 + - key: resource2_attribute2 + value: + stringValue: value2 + scopeLogs: + - scope: + name: otel_collector + version: 0.1.0 + logRecords: + - attributes: + - key: resource1_scope1_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_scope1_body + timeUnixNano: "11651379494838207123" + - scope: + name: external_service + version: 0.2.0 + logRecords: + - attributes: + - key: resource1_scope2_attr1 + value: + stringValue: value1 + body: + stringValue: resource1_log2_body + timeUnixNano: "11651379494838207153" + - resource: + attributes: + - key: resource2_attribute1 + value: + stringValue: value1 + scopeLogs: + - scope: + name: otel_collector + version: 0.1.0 + logRecords: + - attributes: + - key: resource2_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource2_log2_body + timeUnixNano: "11651379494838209123" + - resource: + attributes: + - key: resource3_attribute1 + value: + stringValue: value1 + scopeLogs: + - scope: + name: otel_collector + version: 0.1.0 + logRecords: + - attributes: + - key: resource3_log1_attr1 + value: + stringValue: value1 + body: + stringValue: resource3_log2_body + timeUnixNano: "11651379494838221123"