Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change obsreport receiver calling pattern in signalfxreceiver, splunkhecreceiver, statsdreceiver, and redisreceiver #3521

Merged
merged 21 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
78fc0c3
Make changes to awsxrayreceiver
humivo May 20, 2021
0b776eb
Merge branch 'open-telemetry:main' into 3436-ChangeReceiverPattern
humivo May 20, 2021
76ec18f
Merge branch 'main' of github.com:humivo/opentelemetry-collector-cont…
humivo May 20, 2021
f0fa967
Make changes to carbonreceiver
humivo May 20, 2021
9623b26
Merge branch '3436-ChangeReceiverPattern' of github.com:humivo/opente…
humivo May 20, 2021
9e8b821
Add entry to changelog
humivo May 20, 2021
1e5d622
Merge branch 'open-telemetry:main' into 3436-ChangeReceiverPattern
humivo May 21, 2021
ee598f4
Make changes to dockerstatsreceiver
humivo May 21, 2021
f365af4
Add entry to changelog
humivo May 21, 2021
17816ce
Merge branch 'open-telemetry:main' into 3436-ChangeReceiverPattern
humivo May 24, 2021
0ab12a2
Merge branch 'open-telemetry:main' into 3436-ChangeReceiverPattern
humivo May 24, 2021
a163738
Makes changes to receiver pattern
humivo May 24, 2021
b2a70ed
Merge branch '3436-ChangeReceiverPattern' of github.com:humivo/opente…
humivo May 24, 2021
8f54b4f
Merge branch 'main' into 3436-ChangeReceiverPattern
humivo May 25, 2021
339506a
Merge branch 'open-telemetry:main' into 3436-ChangeReceiverPattern
humivo May 25, 2021
6b2a7b1
Fix test issue
humivo May 25, 2021
446068b
Add entry intro changelog
humivo May 25, 2021
5a1d73b
Change receiver pattern for rest of receivers
humivo May 25, 2021
fb959cf
Add entry in changelog
humivo May 25, 2021
8c9a9fd
Merge branch 'main' into 3436-ChangeReceiverPatternRest
humivo May 25, 2021
7a601cf
Merge branch 'main' into 3436-ChangeReceiverPatternRest
humivo May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ The OpenTelemetry Collector Contrib contains everything in the [opentelemetry-co
- `groupbytrace` processor: Added workers for queue processing (#2902)
- `resourcedetection` processor: Add docker detector (#2775)
- `tailsampling` processor: Support regex on span attribute filtering (#3335_
- Change obsreport helpers for receiver to use the new pattern created in Collector (#3439,#3443,#3449,#3504)
- Change obsreport helpers for receiver to use the new pattern created in Collector (#3439,#3443,#3449,#3504,#3521)

## 🧰 Bug fixes 🧰

Expand Down
13 changes: 8 additions & 5 deletions receiver/redisreceiver/redis_runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

var _ interval.Runnable = (*redisRunnable)(nil)

const transport = "http" // todo verify this

// Runs intermittently, fetching info from Redis, creating metrics/datapoints,
// and feeding them to a metricsConsumer.
type redisRunnable struct {
Expand All @@ -40,6 +42,7 @@ type redisRunnable struct {
logger *zap.Logger
timeBundle *timeBundle
serviceName string
obsrecv *obsreport.Receiver
}

func newRedisRunnable(
Expand All @@ -57,6 +60,7 @@ func newRedisRunnable(
redisSvc: newRedisSvc(client),
metricsConsumer: metricsConsumer,
logger: logger,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: transport}),
}
}

Expand All @@ -74,18 +78,17 @@ func (r *redisRunnable) Setup() error {
// active Redis database, of which there can be 16.
func (r *redisRunnable) Run() error {
const dataFormat = "redis"
const transport = "http" // todo verify this
ctx := obsreport.StartMetricsReceiveOp(r.ctx, r.id, transport)
ctx := r.obsrecv.StartMetricsReceiveOp(r.ctx)

inf, err := r.redisSvc.info()
if err != nil {
obsreport.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
r.obsrecv.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
return nil
}

uptime, err := inf.getUptimeInSeconds()
if err != nil {
obsreport.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
r.obsrecv.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
return nil
}

Expand Down Expand Up @@ -121,7 +124,7 @@ func (r *redisRunnable) Run() error {

err = r.metricsConsumer.ConsumeMetrics(r.ctx, pdm)
_, numPoints := pdm.MetricAndDataPointCount()
obsreport.EndMetricsReceiveOp(ctx, dataFormat, numPoints, err)
r.obsrecv.EndMetricsReceiveOp(ctx, dataFormat, numPoints, err)

return nil
}
22 changes: 14 additions & 8 deletions receiver/signalfxreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type sfxReceiver struct {
metricsConsumer consumer.Metrics
logsConsumer consumer.Logs
server *http.Server
obsrecv *obsreport.Receiver
}

var _ component.MetricsReceiver = (*sfxReceiver)(nil)
Expand All @@ -93,9 +94,14 @@ func newReceiver(
logger *zap.Logger,
config Config,
) *sfxReceiver {
transport := "http"
if config.TLSSetting != nil {
transport = "https"
}
r := &sfxReceiver{
logger: logger,
config: &config,
logger: logger,
config: &config,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: config.ID(), Transport: transport}),
}

return r
Expand Down Expand Up @@ -212,7 +218,7 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req
}

ctx := obsreport.ReceiverContext(req.Context(), r.config.ID(), transport)
ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.ID(), transport)
ctx = r.obsrecv.StartMetricsReceiveOp(ctx)

if r.metricsConsumer == nil {
r.failRequest(ctx, resp, http.StatusBadRequest, errMetricsNotConfigured, nil)
Expand All @@ -231,7 +237,7 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req
}

if len(msg.Datapoints) == 0 {
obsreport.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
r.obsrecv.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
resp.Write(okRespBody)
return
}
Expand All @@ -249,7 +255,7 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req
}

err := r.metricsConsumer.ConsumeMetrics(ctx, md)
obsreport.EndMetricsReceiveOp(
r.obsrecv.EndMetricsReceiveOp(
ctx,
typeStr,
len(msg.Datapoints),
Expand All @@ -265,7 +271,7 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request
}

ctx := obsreport.ReceiverContext(req.Context(), r.config.ID(), transport)
ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.ID(), transport)
ctx = r.obsrecv.StartMetricsReceiveOp(ctx)

if r.logsConsumer == nil {
r.failRequest(ctx, resp, http.StatusBadRequest, errLogsNotConfigured, nil)
Expand All @@ -284,7 +290,7 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request
}

if len(msg.Events) == 0 {
obsreport.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
r.obsrecv.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
resp.Write(okRespBody)
return
}
Expand All @@ -301,7 +307,7 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request
}

err := r.logsConsumer.ConsumeLogs(ctx, ld)
obsreport.EndMetricsReceiveOp(
r.obsrecv.EndMetricsReceiveOp(
ctx,
typeStr,
len(msg.Events),
Expand Down
11 changes: 9 additions & 2 deletions receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type splunkReceiver struct {
logsConsumer consumer.Logs
metricsConsumer consumer.Metrics
server *http.Server
obsrecv *obsreport.Receiver
}

var _ component.MetricsReceiver = (*splunkReceiver)(nil)
Expand All @@ -97,6 +98,11 @@ func NewMetricsReceiver(
return nil, errEmptyEndpoint
}

transport := "http"
if config.TLSSetting != nil {
transport = "https"
}

r := &splunkReceiver{
logger: logger,
config: &config,
Expand All @@ -108,6 +114,7 @@ func NewMetricsReceiver(
ReadHeaderTimeout: defaultServerTimeout,
WriteTimeout: defaultServerTimeout,
},
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: config.ID(), Transport: transport}),
}

return r, nil
Expand Down Expand Up @@ -196,7 +203,7 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)

ctx := obsreport.ReceiverContext(req.Context(), r.config.ID(), transport)
if r.logsConsumer == nil {
ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.ID(), transport)
ctx = r.obsrecv.StartMetricsReceiveOp(ctx)
}
reqPath := req.URL.Path
if !r.config.pathGlob.Match(reqPath) {
Expand Down Expand Up @@ -275,7 +282,7 @@ func (r *splunkReceiver) consumeMetrics(ctx context.Context, events []*splunk.Ev
md, _ := SplunkHecToMetricsData(r.logger, events, r.createResourceCustomizer(req))

decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md)
obsreport.EndMetricsReceiveOp(ctx, typeStr, len(events), decodeErr)
r.obsrecv.EndMetricsReceiveOp(ctx, typeStr, len(events), decodeErr)

if decodeErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
Expand Down
6 changes: 4 additions & 2 deletions receiver/statsdreceiver/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type reporter struct {
id config.ComponentID
logger *zap.Logger
sugaredLogger *zap.SugaredLogger // Used for generic debug logging
obsrecv *obsreport.Receiver
}

var _ transport.Reporter = (*reporter)(nil)
Expand All @@ -40,6 +41,7 @@ func newReporter(receiverID config.ComponentID, logger *zap.Logger) transport.Re
id: receiverID,
logger: logger,
sugaredLogger: logger.Sugar(),
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiverID, Transport: "tcp"}),
}
}

Expand All @@ -49,7 +51,7 @@ func newReporter(receiverID config.ComponentID, logger *zap.Logger) transport.Re
// returned span.
func (r *reporter) OnDataReceived(ctx context.Context) context.Context {
ctx = obsreport.ReceiverContext(ctx, r.id, "tcp")
return obsreport.StartMetricsReceiveOp(ctx, r.id, "tcp")
return r.obsrecv.StartMetricsReceiveOp(ctx)
}

// OnTranslationError is used to report a translation error from original
Expand Down Expand Up @@ -93,7 +95,7 @@ func (r *reporter) OnMetricsProcessed(
})
}

obsreport.EndMetricsReceiveOp(ctx, "statsd", numReceivedMessages, err)
r.obsrecv.EndMetricsReceiveOp(ctx, "statsd", numReceivedMessages, err)
}

func (r *reporter) OnDebugf(template string, args ...interface{}) {
Expand Down