forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
receiver.go
125 lines (108 loc) · 3.83 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package awsxrayreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver"
import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/proxy"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/xray/telemetry"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver/internal/translator"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver/internal/udppoller"
)
const (
// number of goroutines polling the UDP socket.
// https://github.com/aws/aws-xray-daemon/blob/master/pkg/cfg/cfg.go#L184
maxPollerCount = 2
)
// xrayReceiver implements the receiver.Traces interface for converting
// AWS X-Ray segment document into the OT internal trace format.
type xrayReceiver struct {
poller udppoller.Poller
server proxy.Server
settings receiver.CreateSettings
consumer consumer.Traces
obsrecv *receiverhelper.ObsReport
registry telemetry.Registry
}
func newReceiver(config *Config,
consumer consumer.Traces,
set receiver.CreateSettings) (receiver.Traces, error) {
set.Logger.Info("Going to listen on endpoint for X-Ray segments",
zap.String(udppoller.Transport, config.Endpoint))
poller, err := udppoller.New(&udppoller.Config{
Transport: string(config.Transport),
Endpoint: config.Endpoint,
NumOfPollerToStart: maxPollerCount,
}, set)
if err != nil {
return nil, err
}
set.Logger.Info("Listening on endpoint for X-Ray segments",
zap.String(udppoller.Transport, config.Endpoint))
srv, err := proxy.NewServer(config.ProxyServer, set.Logger)
if err != nil {
return nil, err
}
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: udppoller.Transport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
return &xrayReceiver{
poller: poller,
server: srv,
settings: set,
consumer: consumer,
obsrecv: obsrecv,
registry: telemetry.GlobalRegistry(),
}, nil
}
func (x *xrayReceiver) Start(ctx context.Context, _ component.Host) error {
// TODO: Might want to pass `host` into read() below to report a fatal error
x.poller.Start(ctx)
go x.start()
go func() {
_ = x.server.ListenAndServe()
}()
x.settings.Logger.Info("X-Ray TCP proxy server started")
return nil
}
func (x *xrayReceiver) Shutdown(ctx context.Context) error {
var err error
if pollerErr := x.poller.Close(); pollerErr != nil {
err = fmt.Errorf("failed to close poller: %w", pollerErr)
}
if proxyErr := x.server.Shutdown(ctx); proxyErr != nil {
err = errors.Join(err, fmt.Errorf("failed to close proxy: %w", proxyErr))
}
return err
}
func (x *xrayReceiver) start() {
incomingSegments := x.poller.SegmentsChan()
for seg := range incomingSegments {
ctx := x.obsrecv.StartTracesOp(seg.Ctx)
traces, totalSpanCount, err := translator.ToTraces(seg.Payload, x.registry.LoadOrNop(x.settings.ID))
if err != nil {
x.settings.Logger.Warn("X-Ray segment to OT traces conversion failed", zap.Error(err))
x.obsrecv.EndTracesOp(ctx, metadata.Type.String(), totalSpanCount, err)
continue
}
err = x.consumer.ConsumeTraces(ctx, traces)
if err != nil {
x.settings.Logger.Warn("Trace consumer errored out", zap.Error(err))
x.obsrecv.EndTracesOp(ctx, metadata.Type.String(), totalSpanCount, err)
continue
}
x.obsrecv.EndTracesOp(ctx, metadata.Type.String(), totalSpanCount, nil)
}
}