Skip to content

Commit

Permalink
[chore]: rename files in filterprocessor, use RemoveIf once (#16376)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 19, 2022
1 parent fc6980a commit b6831bd
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,51 +60,37 @@ func newFilterLogsProcessor(logger *zap.Logger, cfg *Config) (*filterLogProcesso
}, nil
}

func (flp *filterLogProcessor) ProcessLogs(ctx context.Context, logs plog.Logs) (plog.Logs, error) {
func (flp *filterLogProcessor) ProcessLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {
rLogs := logs.ResourceLogs()

// Filter out logs
flp.filterLogRecords(rLogs)

if rLogs.Len() == 0 {
return logs, processorhelper.ErrSkipProcessingData
}

return logs, nil
}

func (flp *filterLogProcessor) filterLogRecords(rLogs plog.ResourceLogsSlice) {
for i := 0; i < rLogs.Len(); i++ {
rLog := rLogs.At(i)
resource := rLog.Resource()
scopes := rLog.ScopeLogs()

for j := 0; j < scopes.Len(); j++ {
scope := scopes.At(j)
instrumentationScope := scope.Scope()
lrs := scope.LogRecords()
rLogs.RemoveIf(func(rl plog.ResourceLogs) bool {
resource := rl.Resource()
rl.ScopeLogs().RemoveIf(func(sl plog.ScopeLogs) bool {
scope := sl.Scope()
lrs := sl.LogRecords()

if flp.includeMatcher != nil {
// If includeMatcher exists, remove all records that do not match the filter.
lrs.RemoveIf(func(lr plog.LogRecord) bool {
return !flp.includeMatcher.MatchLogRecord(lr, resource, instrumentationScope)
return !flp.includeMatcher.MatchLogRecord(lr, resource, scope)
})
}

if flp.excludeMatcher != nil {
// If excludeMatcher exists, remove all records that match the filter.
lrs.RemoveIf(func(lr plog.LogRecord) bool {
return flp.excludeMatcher.MatchLogRecord(lr, resource, instrumentationScope)
return flp.excludeMatcher.MatchLogRecord(lr, resource, scope)
})
}
}

scopes.RemoveIf(func(sl plog.ScopeLogs) bool {
return sl.LogRecords().Len() == 0
})
}

rLogs.RemoveIf(func(rl plog.ResourceLogs) bool {
return rl.ScopeLogs().Len() == 0
})

if rLogs.Len() == 0 {
return logs, processorhelper.ErrSkipProcessingData
}

return logs, nil
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle
import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"
Expand Down Expand Up @@ -87,41 +86,31 @@ func createSpanMatcher(cfg *Config) (filterspan.Matcher, filterspan.Matcher, err

// processTraces filters the given spans of a traces based off the filterSpanProcessor's filters.
func (fsp *filterSpanProcessor) processTraces(_ context.Context, pdt ptrace.Traces) (ptrace.Traces, error) {
for i := 0; i < pdt.ResourceSpans().Len(); i++ {
resSpan := pdt.ResourceSpans().At(i)
for x := 0; x < resSpan.ScopeSpans().Len(); x++ {
ils := resSpan.ScopeSpans().At(x)
ils.Spans().RemoveIf(func(span ptrace.Span) bool {
return fsp.shouldRemoveSpan(span, resSpan.Resource(), ils.Scope())
pdt.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool {
resource := rs.Resource()
rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool {
scope := ss.Scope()
ss.Spans().RemoveIf(func(span ptrace.Span) bool {
if fsp.include != nil {
if !fsp.include.MatchSpan(span, resource, scope) {
return true
}
}

if fsp.exclude != nil {
if fsp.exclude.MatchSpan(span, resource, scope) {
return true
}
}

return false
})
}
// Remove empty elements, that way if we delete everything we can tell
// the pipeline to stop processing completely (ErrSkipProcessingData)
resSpan.ScopeSpans().RemoveIf(func(ilsSpans ptrace.ScopeSpans) bool {
return ilsSpans.Spans().Len() == 0
return ss.Spans().Len() == 0
})
}
pdt.ResourceSpans().RemoveIf(func(res ptrace.ResourceSpans) bool {
return res.ScopeSpans().Len() == 0
return rs.ScopeSpans().Len() == 0
})
if pdt.ResourceSpans().Len() == 0 {
return pdt, processorhelper.ErrSkipProcessingData
}
return pdt, nil
}

func (fsp *filterSpanProcessor) shouldRemoveSpan(span ptrace.Span, resource pcommon.Resource, library pcommon.InstrumentationScope) bool {
if fsp.include != nil {
if !fsp.include.MatchSpan(span, resource, library) {
return true
}
}

if fsp.exclude != nil {
if fsp.exclude.MatchSpan(span, resource, library) {
return true
}
}

return false
}
File renamed without changes.

0 comments on commit b6831bd

Please sign in to comment.