Skip to content

Commit

Permalink
Change obsreport receiver calling pattern in signalfxreceiver, splunk…
Browse files Browse the repository at this point in the history
…hecreceiver, statsdreceiver, and redisreceiver (open-telemetry#3521)

* Make changes to awsxrayreceiver

* Make changes to carbonreceiver

* Add entry to changelog

* Make changes to dockerstatsreceiver

* Add entry to changelog

* Makes changes to receiver pattern

* Fix test issue

* Add entry intro changelog

* Change receiver pattern for rest of receivers

* Add entry in changelog
  • Loading branch information
humivo authored and Mark Stumpf committed Aug 31, 2021
1 parent 09dde49 commit a065fa3
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The OpenTelemetry Collector Contrib contains everything in the [opentelemetry-co
- `groupbytrace` processor: Added workers for queue processing (#2902)
- `resourcedetection` processor: Add docker detector (#2775)
- `tailsampling` processor: Support regex on span attribute filtering (#3335_
- Change obsreport helpers for receiver to use the new pattern created in Collector (#3439,#3443,#3449,#3504)
- Change obsreport helpers for receiver to use the new pattern created in Collector (#3439,#3443,#3449,#3504,#3521)

## 🧰 Bug fixes 🧰

Expand Down
13 changes: 8 additions & 5 deletions receiver/redisreceiver/redis_runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

var _ interval.Runnable = (*redisRunnable)(nil)

const transport = "http" // todo verify this

// Runs intermittently, fetching info from Redis, creating metrics/datapoints,
// and feeding them to a metricsConsumer.
type redisRunnable struct {
Expand All @@ -40,6 +42,7 @@ type redisRunnable struct {
logger *zap.Logger
timeBundle *timeBundle
serviceName string
obsrecv *obsreport.Receiver
}

func newRedisRunnable(
Expand All @@ -57,6 +60,7 @@ func newRedisRunnable(
redisSvc: newRedisSvc(client),
metricsConsumer: metricsConsumer,
logger: logger,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: transport}),
}
}

Expand All @@ -74,18 +78,17 @@ func (r *redisRunnable) Setup() error {
// active Redis database, of which there can be 16.
func (r *redisRunnable) Run() error {
const dataFormat = "redis"
const transport = "http" // todo verify this
ctx := obsreport.StartMetricsReceiveOp(r.ctx, r.id, transport)
ctx := r.obsrecv.StartMetricsReceiveOp(r.ctx)

inf, err := r.redisSvc.info()
if err != nil {
obsreport.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
r.obsrecv.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
return nil
}

uptime, err := inf.getUptimeInSeconds()
if err != nil {
obsreport.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
r.obsrecv.EndMetricsReceiveOp(ctx, dataFormat, 0, err)
return nil
}

Expand Down Expand Up @@ -121,7 +124,7 @@ func (r *redisRunnable) Run() error {

err = r.metricsConsumer.ConsumeMetrics(r.ctx, pdm)
_, numPoints := pdm.MetricAndDataPointCount()
obsreport.EndMetricsReceiveOp(ctx, dataFormat, numPoints, err)
r.obsrecv.EndMetricsReceiveOp(ctx, dataFormat, numPoints, err)

return nil
}
22 changes: 14 additions & 8 deletions receiver/signalfxreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type sfxReceiver struct {
metricsConsumer consumer.Metrics
logsConsumer consumer.Logs
server *http.Server
obsrecv *obsreport.Receiver
}

var _ component.MetricsReceiver = (*sfxReceiver)(nil)
Expand All @@ -93,9 +94,14 @@ func newReceiver(
logger *zap.Logger,
config Config,
) *sfxReceiver {
transport := "http"
if config.TLSSetting != nil {
transport = "https"
}
r := &sfxReceiver{
logger: logger,
config: &config,
logger: logger,
config: &config,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: config.ID(), Transport: transport}),
}

return r
Expand Down Expand Up @@ -212,7 +218,7 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req
}

ctx := obsreport.ReceiverContext(req.Context(), r.config.ID(), transport)
ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.ID(), transport)
ctx = r.obsrecv.StartMetricsReceiveOp(ctx)

if r.metricsConsumer == nil {
r.failRequest(ctx, resp, http.StatusBadRequest, errMetricsNotConfigured, nil)
Expand All @@ -231,7 +237,7 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req
}

if len(msg.Datapoints) == 0 {
obsreport.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
r.obsrecv.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
resp.Write(okRespBody)
return
}
Expand All @@ -249,7 +255,7 @@ func (r *sfxReceiver) handleDatapointReq(resp http.ResponseWriter, req *http.Req
}

err := r.metricsConsumer.ConsumeMetrics(ctx, md)
obsreport.EndMetricsReceiveOp(
r.obsrecv.EndMetricsReceiveOp(
ctx,
typeStr,
len(msg.Datapoints),
Expand All @@ -265,7 +271,7 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request
}

ctx := obsreport.ReceiverContext(req.Context(), r.config.ID(), transport)
ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.ID(), transport)
ctx = r.obsrecv.StartMetricsReceiveOp(ctx)

if r.logsConsumer == nil {
r.failRequest(ctx, resp, http.StatusBadRequest, errLogsNotConfigured, nil)
Expand All @@ -284,7 +290,7 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request
}

if len(msg.Events) == 0 {
obsreport.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
r.obsrecv.EndMetricsReceiveOp(ctx, typeStr, 0, nil)
resp.Write(okRespBody)
return
}
Expand All @@ -301,7 +307,7 @@ func (r *sfxReceiver) handleEventReq(resp http.ResponseWriter, req *http.Request
}

err := r.logsConsumer.ConsumeLogs(ctx, ld)
obsreport.EndMetricsReceiveOp(
r.obsrecv.EndMetricsReceiveOp(
ctx,
typeStr,
len(msg.Events),
Expand Down
11 changes: 9 additions & 2 deletions receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type splunkReceiver struct {
logsConsumer consumer.Logs
metricsConsumer consumer.Metrics
server *http.Server
obsrecv *obsreport.Receiver
}

var _ component.MetricsReceiver = (*splunkReceiver)(nil)
Expand All @@ -97,6 +98,11 @@ func NewMetricsReceiver(
return nil, errEmptyEndpoint
}

transport := "http"
if config.TLSSetting != nil {
transport = "https"
}

r := &splunkReceiver{
logger: logger,
config: &config,
Expand All @@ -108,6 +114,7 @@ func NewMetricsReceiver(
ReadHeaderTimeout: defaultServerTimeout,
WriteTimeout: defaultServerTimeout,
},
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: config.ID(), Transport: transport}),
}

return r, nil
Expand Down Expand Up @@ -196,7 +203,7 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)

ctx := obsreport.ReceiverContext(req.Context(), r.config.ID(), transport)
if r.logsConsumer == nil {
ctx = obsreport.StartMetricsReceiveOp(ctx, r.config.ID(), transport)
ctx = r.obsrecv.StartMetricsReceiveOp(ctx)
}
reqPath := req.URL.Path
if !r.config.pathGlob.Match(reqPath) {
Expand Down Expand Up @@ -275,7 +282,7 @@ func (r *splunkReceiver) consumeMetrics(ctx context.Context, events []*splunk.Ev
md, _ := SplunkHecToMetricsData(r.logger, events, r.createResourceCustomizer(req))

decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md)
obsreport.EndMetricsReceiveOp(ctx, typeStr, len(events), decodeErr)
r.obsrecv.EndMetricsReceiveOp(ctx, typeStr, len(events), decodeErr)

if decodeErr != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
Expand Down
6 changes: 4 additions & 2 deletions receiver/statsdreceiver/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type reporter struct {
id config.ComponentID
logger *zap.Logger
sugaredLogger *zap.SugaredLogger // Used for generic debug logging
obsrecv *obsreport.Receiver
}

var _ transport.Reporter = (*reporter)(nil)
Expand All @@ -40,6 +41,7 @@ func newReporter(receiverID config.ComponentID, logger *zap.Logger) transport.Re
id: receiverID,
logger: logger,
sugaredLogger: logger.Sugar(),
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiverID, Transport: "tcp"}),
}
}

Expand All @@ -49,7 +51,7 @@ func newReporter(receiverID config.ComponentID, logger *zap.Logger) transport.Re
// returned span.
func (r *reporter) OnDataReceived(ctx context.Context) context.Context {
ctx = obsreport.ReceiverContext(ctx, r.id, "tcp")
return obsreport.StartMetricsReceiveOp(ctx, r.id, "tcp")
return r.obsrecv.StartMetricsReceiveOp(ctx)
}

// OnTranslationError is used to report a translation error from original
Expand Down Expand Up @@ -93,7 +95,7 @@ func (r *reporter) OnMetricsProcessed(
})
}

obsreport.EndMetricsReceiveOp(ctx, "statsd", numReceivedMessages, err)
r.obsrecv.EndMetricsReceiveOp(ctx, "statsd", numReceivedMessages, err)
}

func (r *reporter) OnDebugf(template string, args ...interface{}) {
Expand Down

0 comments on commit a065fa3

Please sign in to comment.