forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logs.go
119 lines (101 loc) · 2.78 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package routingconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector"
import (
"context"
"errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
)
type logsConnector struct {
component.StartFunc
component.ShutdownFunc
logger *zap.Logger
config *Config
router *router[consumer.Logs]
}
func newLogsConnector(
set connector.CreateSettings,
config component.Config,
logs consumer.Logs,
) (*logsConnector, error) {
cfg := config.(*Config)
lr, ok := logs.(connector.LogsRouterAndConsumer)
if !ok {
return nil, errUnexpectedConsumer
}
r, err := newRouter(
cfg.Table,
cfg.DefaultPipelines,
lr.Consumer,
set.TelemetrySettings)
if err != nil {
return nil, err
}
return &logsConnector{
logger: set.TelemetrySettings.Logger,
config: cfg,
router: r,
}, nil
}
func (c *logsConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
// routingEntry is used to group plog.ResourceLogs that are routed to
// the same set of exporters.
// This way we're not ending up with all the logs split up which would cause
// higher CPU usage.
groups := make(map[consumer.Logs]plog.Logs)
var errs error
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)
rtx := ottlresource.NewTransformContext(rlogs.Resource())
noRoutesMatch := true
for _, route := range c.router.routeSlice {
_, isMatch, err := route.statement.Execute(ctx, rtx)
if err != nil {
if c.config.ErrorMode == ottl.PropagateError {
return err
}
c.group(groups, c.router.defaultConsumer, rlogs)
continue
}
if isMatch {
noRoutesMatch = false
c.group(groups, route.consumer, rlogs)
if c.config.MatchOnce {
break
}
}
}
if noRoutesMatch {
// no route conditions are matched, add resource logs to default exporters group
c.group(groups, c.router.defaultConsumer, rlogs)
}
}
for consumer, group := range groups {
errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group))
}
return errs
}
func (c *logsConnector) group(
groups map[consumer.Logs]plog.Logs,
consumer consumer.Logs,
logs plog.ResourceLogs,
) {
if consumer == nil {
return
}
group, ok := groups[consumer]
if !ok {
group = plog.NewLogs()
}
logs.CopyTo(group.ResourceLogs().AppendEmpty())
groups[consumer] = group
}