Skip to content

Commit

Permalink
[chore] [exporter/splunkhec] Add per scope batcher (open-telemetry#21900
Browse files Browse the repository at this point in the history
)

This batcher is intended to be used to split incoming logs batches into profiling and regular logs prior to the processing to simplify the exporter logic.

The batcher is written in a way to introduce no overhead if the logs batches don't contain mixed data, which is the most common use case.

This change just adds the batcher for now to make review easier. Actual enablement will come next.
  • Loading branch information
dmitryax committed May 13, 2023
1 parent d8c3210 commit d9024f6
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 0 deletions.
113 changes: 113 additions & 0 deletions exporter/splunkhecexporter/batchperscope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter"

import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
)

// perScopeBatcher is a consumer.Logs that rebatches logs by a type found in the scope name: profiling or regular logs.
type perScopeBatcher struct {
logsEnabled bool
profilingEnabled bool
next consumer.Logs
}

func (rb *perScopeBatcher) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (rb *perScopeBatcher) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
var profilingFound bool
var otherLogsFound bool

for i := 0; i < logs.ResourceLogs().Len(); i++ {
rs := logs.ResourceLogs().At(i)
for j := 0; j < rs.ScopeLogs().Len(); j++ {
if isProfilingData(rs.ScopeLogs().At(j)) {
profilingFound = true
} else {
otherLogsFound = true
}
}
if profilingFound && otherLogsFound {
break
}
}

// if we don't have both types of logs, just call next if enabled
if !profilingFound || !otherLogsFound {
if rb.logsEnabled && otherLogsFound {
return rb.next.ConsumeLogs(ctx, logs)
}
if rb.profilingEnabled && profilingFound {
return rb.next.ConsumeLogs(ctx, logs)
}
return nil
}

profilingLogs := plog.NewLogs()
otherLogs := plog.NewLogs()

for i := 0; i < logs.ResourceLogs().Len(); i++ {
rs := logs.ResourceLogs().At(i)
profilingFound = false
otherLogsFound = false
for j := 0; j < rs.ScopeLogs().Len(); j++ {
sl := rs.ScopeLogs().At(j)
if isProfilingData(sl) {
profilingFound = true
} else {
otherLogsFound = true
}
}
switch {
case profilingFound && otherLogsFound:
if rb.profilingEnabled {
copyResourceLogs(rs, profilingLogs.ResourceLogs().AppendEmpty(), true)
}
if rb.logsEnabled {
copyResourceLogs(rs, otherLogs.ResourceLogs().AppendEmpty(), false)
}
case profilingFound && rb.profilingEnabled:
rs.CopyTo(profilingLogs.ResourceLogs().AppendEmpty())
case otherLogsFound && rb.logsEnabled:
rs.CopyTo(otherLogs.ResourceLogs().AppendEmpty())
}
}

var err error
if rb.logsEnabled {
err = multierr.Append(err, rb.next.ConsumeLogs(ctx, otherLogs))
}
if rb.profilingEnabled {
err = multierr.Append(err, rb.next.ConsumeLogs(ctx, profilingLogs))
}
return err
}

func copyResourceLogs(src plog.ResourceLogs, dest plog.ResourceLogs, isProfiling bool) {
src.Resource().CopyTo(dest.Resource())
for j := 0; j < src.ScopeLogs().Len(); j++ {
sl := src.ScopeLogs().At(j)
if isProfilingData(sl) == isProfiling {
sl.CopyTo(dest.ScopeLogs().AppendEmpty())
}
}
}
125 changes: 125 additions & 0 deletions exporter/splunkhecexporter/batchperscope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr"

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func TestBatchLogs_ConsumeLogs(t *testing.T) {
tests := []struct {
name string
profilingEnabled bool
logsEnabled bool
in string
out []string
}{
{
name: "profiling_only_both_enabled",
profilingEnabled: true,
logsEnabled: true,
in: "profiling_only.yaml",
out: []string{"profiling_only.yaml"},
},
{
name: "profiling_only_profiling_enabled",
profilingEnabled: true,
in: "profiling_only.yaml",
out: []string{"profiling_only.yaml"},
},
{
name: "profiling_only_profiling_disabled",
logsEnabled: true,
in: "profiling_only.yaml",
out: []string{},
},
{
name: "regular_logs_only_both_enabled",
profilingEnabled: true,
logsEnabled: true,
in: "regular_logs_only.yaml",
out: []string{"regular_logs_only.yaml"},
},
{
name: "regular_logs_only_logs_enabled",
logsEnabled: true,
in: "regular_logs_only.yaml",
out: []string{"regular_logs_only.yaml"},
},
{
name: "regular_logs_only_logs_disabled",
profilingEnabled: true,
in: "regular_logs_only.yaml",
out: []string{},
},
{
name: "combined_both_enabled",
profilingEnabled: true,
logsEnabled: true,
in: "combined.yaml",
out: []string{"regular_logs_only.yaml", "profiling_only.yaml"},
},
{
name: "combined_profiling_disabled",
logsEnabled: true,
in: "combined.yaml",
out: []string{"regular_logs_only.yaml"},
},
{
name: "combined_logs_disabled",
profilingEnabled: true,
in: "combined.yaml",
out: []string{"profiling_only.yaml"},
},
{
name: "combined_both_disabled",
in: "combined.yaml",
out: []string{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := &consumertest.LogsSink{}
consumer := &perScopeBatcher{
profilingEnabled: tt.profilingEnabled,
logsEnabled: tt.logsEnabled,
next: sink,
}

logs, err := golden.ReadLogs("testdata/batchperscope/" + tt.in)
require.NoError(t, err)

err = consumer.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)

assert.Equal(t, len(tt.out), len(sink.AllLogs()))
for i, out := range tt.out {
expected, err := golden.ReadLogs("testdata/batchperscope/" + out)
require.NoError(t, err)
assert.NoError(t, plogtest.CompareLogs(expected, sink.AllLogs()[i]))
}
})
}

}
3 changes: 3 additions & 0 deletions exporter/splunkhecexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.77.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.77.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.77.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.77.0
github.com/stretchr/testify v1.8.2
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.77.0
Expand All @@ -22,6 +23,7 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand All @@ -37,6 +39,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.77.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.9.0 // indirect
go.opentelemetry.io/collector/featuregate v0.77.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions exporter/splunkhecexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

109 changes: 109 additions & 0 deletions exporter/splunkhecexporter/testdata/batchperscope/combined.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
resourceLogs:
- resource:
attributes:
- key: resource1_attribute1
value:
stringValue: value1
- key: resource2_attribute2
value:
stringValue: value2
scopeLogs:
- scope:
name: otel.profiling
logRecords:
- attributes:
- key: resource1_prof_scope_log1_attr1
value:
stringValue: value1
body:
stringValue: resource1_prof_scope_log1_body
timeUnixNano: "11651379494838206464"
- attributes:
- key: resource1_prof_scope_log1_attr1
value:
stringValue: value1
body:
stringValue: resource1_prof_scope_log2_body
timeUnixNano: "11651379494838206472"
- scope:
name: otel_collector
version: 0.1.0
logRecords:
- attributes:
- key: resource1_scope1_attr1
value:
stringValue: value1
body:
stringValue: resource1_scope1_body
timeUnixNano: "11651379494838207123"
- scope:
name: external_service
version: 0.2.0
logRecords:
- attributes:
- key: resource1_scope2_attr1
value:
stringValue: value1
body:
stringValue: resource1_log2_body
timeUnixNano: "11651379494838207153"
- resource:
attributes:
- key: resource2_attribute1
value:
stringValue: value1
scopeLogs:
- scope:
name: otel_collector
version: 0.1.0
logRecords:
- attributes:
- key: resource2_log1_attr1
value:
stringValue: value1
body:
stringValue: resource2_log2_body
timeUnixNano: "11651379494838209123"
- scope:
name: otel.profiling
logRecords:
- attributes:
- key: resource2_prof_scope_log1_attr1
value:
stringValue: value1
body:
stringValue: resource2_prof_scope_log1_body
timeUnixNano: "11651379494838206532"
- resource:
attributes:
- key: resource3_attribute1
value:
stringValue: value1
scopeLogs:
- scope:
name: otel_collector
version: 0.1.0
logRecords:
- attributes:
- key: resource3_log1_attr1
value:
stringValue: value1
body:
stringValue: resource3_log2_body
timeUnixNano: "11651379494838221123"
- resource:
attributes:
- key: resource4_attribute1
value:
stringValue: value1
scopeLogs:
- scope:
name: otel.profiling
logRecords:
- attributes:
- key: resource4_prof_scope_log1_attr1
value:
stringValue: value1
body:
stringValue: resource4_prof_scope_log1_body
timeUnixNano: "11651379494838214321"
Loading

0 comments on commit d9024f6

Please sign in to comment.