forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reporter.go
99 lines (84 loc) · 3.1 KB
/
reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"
import (
"context"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport"
)
// reporter struct implements the transport.Reporter interface to give consistent
// observability per Collector metric observability package.
type reporter struct {
logger *zap.Logger
sugaredLogger *zap.SugaredLogger // Used for generic debug logging
obsrecv *receiverhelper.ObsReport
}
var _ transport.Reporter = (*reporter)(nil)
func newReporter(set receiver.CreateSettings) (transport.Reporter, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: "tcp",
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
return &reporter{
logger: set.Logger,
sugaredLogger: set.Logger.Sugar(),
obsrecv: obsrecv,
}, nil
}
// OnDataReceived is called when a message or request is received from
// a client. The returned context should be used in other calls to the same
// reporter instance. The caller code should include a call to end the
// returned span.
func (r *reporter) OnDataReceived(ctx context.Context) context.Context {
return r.obsrecv.StartMetricsOp(ctx)
}
// OnTranslationError is used to report a translation error from original
// format to the internal format of the Collector. The context and span
// passed to it should be the ones returned by OnDataReceived.
func (r *reporter) OnTranslationError(ctx context.Context, err error) {
if err == nil {
return
}
r.logger.Debug("StatsD translation error", zap.Error(err))
// Using annotations since multiple translation errors can happen in the
// same client message/request. The time itself is not relevant.
span := trace.FromContext(ctx)
span.Annotate([]trace.Attribute{
trace.StringAttribute("error", err.Error())},
"translation",
)
}
// OnMetricsProcessed is called when the received data is passed to next
// consumer on the pipeline. The context and span passed to it should be the
// ones returned by OnDataReceived. The error should be error returned by
// the next consumer - the reporter is expected to handle nil error too.
func (r *reporter) OnMetricsProcessed(
ctx context.Context,
numReceivedMessages int,
err error,
) {
if err != nil {
r.logger.Debug(
"StatsD receiver failed to push metrics into pipeline",
zap.Int("numReceivedMessages", numReceivedMessages),
zap.Error(err))
span := trace.FromContext(ctx)
span.SetStatus(trace.Status{
Code: trace.StatusCodeUnknown,
Message: err.Error(),
})
}
r.obsrecv.EndMetricsOp(ctx, "statsd", numReceivedMessages, err)
}
func (r *reporter) OnDebugf(template string, args ...interface{}) {
if r.logger.Check(zap.DebugLevel, "debug") != nil {
r.sugaredLogger.Debugf(template, args...)
}
}