forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
receiver.go
144 lines (120 loc) · 4.36 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator"
import (
"context"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer"
)
var _ receiver.Metrics = (*receiverCreator)(nil)
// receiverCreator implements consumer.Metrics.
type receiverCreator struct {
params receiver.CreateSettings
cfg *Config
nextLogsConsumer consumer.Logs
nextMetricsConsumer consumer.Metrics
nextTracesConsumer consumer.Traces
observerHandler *observerHandler
observables []observer.Observable
}
// newLogsReceiverCreator creates the receiver_creator with the given parameters.
func newLogsReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Logs) (receiver.Logs, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}
r := &receiverCreator{
params: params,
cfg: cfg,
nextLogsConsumer: nextConsumer,
}
return r, nil
}
// newMetricsReceiverCreator creates the receiver_creator with the given parameters.
func newMetricsReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}
r := &receiverCreator{
params: params,
cfg: cfg,
nextMetricsConsumer: nextConsumer,
}
return r, nil
}
// newTracesReceiverCreator creates the receiver_creator with the given parameters.
func newTracesReceiverCreator(params receiver.CreateSettings, cfg *Config, nextConsumer consumer.Traces) (receiver.Traces, error) {
if nextConsumer == nil {
return nil, component.ErrNilNextConsumer
}
r := &receiverCreator{
params: params,
cfg: cfg,
nextTracesConsumer: nextConsumer,
}
return r, nil
}
// loggingHost provides a safer version of host that logs errors instead of exiting the process.
type loggingHost struct {
component.Host
logger *zap.Logger
}
// ReportFatalError causes a log to be made instead of terminating the process as Host does by default.
func (h *loggingHost) ReportFatalError(err error) {
h.logger.Error("receiver reported a fatal error", zap.Error(err))
}
var _ component.Host = (*loggingHost)(nil)
// Start receiver_creator.
func (rc *receiverCreator) Start(_ context.Context, host component.Host) error {
rc.observerHandler = &observerHandler{
config: rc.cfg,
params: rc.params,
receiversByEndpointID: receiverMap{},
nextLogsConsumer: rc.nextLogsConsumer,
nextMetricsConsumer: rc.nextMetricsConsumer,
nextTracesConsumer: rc.nextTracesConsumer,
runner: newReceiverRunner(rc.params, &loggingHost{host, rc.params.Logger}),
}
observers := map[component.ID]observer.Observable{}
// Match all configured observables to the extensions that are running.
for _, watchObserver := range rc.cfg.WatchObservers {
for cid, ext := range host.GetExtensions() {
if cid != watchObserver {
continue
}
obs, ok := ext.(observer.Observable)
if !ok {
return fmt.Errorf("extension %q in watch_observers is not an observer", watchObserver.String())
}
observers[watchObserver] = obs
}
}
// Make sure all observables are present before starting any.
for _, watchObserver := range rc.cfg.WatchObservers {
if observers[watchObserver] == nil {
return fmt.Errorf("failed to find observer %q in the extensions list", watchObserver.String())
}
}
if len(observers) == 0 {
rc.params.Logger.Warn("no observers were configured and no subreceivers will be started. receiver_creator will be disabled")
}
// Start all configured watchers.
for _, observable := range observers {
rc.observables = append(rc.observables, observable)
observable.ListAndWatch(rc.observerHandler)
}
return nil
}
// Shutdown stops the receiver_creator and all its receivers started at runtime.
func (rc *receiverCreator) Shutdown(context.Context) error {
for _, observable := range rc.observables {
observable.Unsubscribe(rc.observerHandler)
}
if rc.observerHandler == nil {
return nil
}
return rc.observerHandler.shutdown()
}