Skip to content

Commit

Permalink
[testbed] Fix loadgenerator race condition (open-telemetry#32351)
Browse files Browse the repository at this point in the history
Fixes a race condition in testbed's default load generator
(`ProviderSender`). The fix changes the behavior of how logging works
for the `ProviderSender`. Before this PR, the code will try to log
errors if the previous error is not the same and it would try to do this
across goroutines. Now, each error will be logged if the previous error
is not the same for each goroutine.

Alternatively, we can also build a logger using bloom filter to try to
log each error once though I am not sure if that would be required. This
PR offers a quick fix while keeping the behavior reasonably close to the
current behavior.

Closes
open-telemetry#32326

**Link to tracking Issue:**
open-telemetry#32326

**Testing:** Follow the steps in the tracking issue.
  • Loading branch information
lahsivjar committed May 1, 2024
1 parent 09b8182 commit 37f405b
Showing 1 changed file with 26 additions and 46 deletions.
72 changes: 26 additions & 46 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ type ProviderSender struct {

options LoadOptions

// Record information about previous errors to avoid flood of error messages.
prevErr error
sendType string
sendType string
generateFunc func() error
}

// NewLoadGenerator creates a ProviderSender to send DataProvider-generated telemetry via a DataSender.
Expand All @@ -90,12 +89,15 @@ func NewLoadGenerator(dataProvider DataProvider, sender DataSender) (LoadGenerat
switch t := ps.Sender.(type) {
case TraceDataSender:
ps.sendType = "traces"
ps.generateFunc = ps.generateTrace
case MetricDataSender:
ps.sendType = "metrics"
ps.generateFunc = ps.generateMetrics
case LogDataSender:
ps.sendType = "logs"
ps.generateFunc = ps.generateLog
default:
ps.sendType = fmt.Sprintf("invalid-%T", t)
return nil, fmt.Errorf("failed creating load generator, unhandled data type %T", t)
}

return ps, nil
Expand Down Expand Up @@ -210,19 +212,17 @@ func (ps *ProviderSender) generate() {
defer workers.Done()
t := time.NewTicker(time.Second / time.Duration(ps.options.DataItemsPerSecond/ps.options.ItemsPerBatch/numWorkers))
defer t.Stop()

var prevErr error
for {
select {
case <-t.C:
switch ps.Sender.(type) {
case TraceDataSender:
ps.generateTrace()
case MetricDataSender:
ps.generateMetrics()
case LogDataSender:
ps.generateLog()
default:
log.Printf("Invalid type of ProviderSender sender")
err := ps.generateFunc()
// log the error if it is different from the previous result
if err != nil && (prevErr == nil || err.Error() != prevErr.Error()) {
log.Printf("%v", err)
}
prevErr = err
case <-ps.stopSignal:
return
}
Expand All @@ -236,19 +236,18 @@ func (ps *ProviderSender) generate() {
ps.Sender.Flush()
}

func (ps *ProviderSender) generateTrace() {
func (ps *ProviderSender) generateTrace() error {
traceSender := ps.Sender.(TraceDataSender)

traceData, done := ps.Provider.GenerateTraces()
if done {
return
return nil
}

for {
err := traceSender.ConsumeTraces(context.Background(), traceData)
if err == nil {
ps.prevErr = nil
break
return nil
}

if !consumererror.IsPermanent(err) {
Expand All @@ -257,29 +256,22 @@ func (ps *ProviderSender) generateTrace() {
}

ps.permanentErrors.Add(uint64(traceData.SpanCount()))

// update prevErr to err if it's different than last observed error
if ps.prevErr == nil || ps.prevErr.Error() != err.Error() {
ps.prevErr = err
log.Printf("Cannot send traces: %v", err)
}
break
return fmt.Errorf("cannot send traces: %w", err)
}
}

func (ps *ProviderSender) generateMetrics() {
func (ps *ProviderSender) generateMetrics() error {
metricSender := ps.Sender.(MetricDataSender)

metricData, done := ps.Provider.GenerateMetrics()
if done {
return
return nil
}

for {
err := metricSender.ConsumeMetrics(context.Background(), metricData)
if err == nil {
ps.prevErr = nil
break
return nil
}

if !consumererror.IsPermanent(err) {
Expand All @@ -288,28 +280,22 @@ func (ps *ProviderSender) generateMetrics() {
}

ps.permanentErrors.Add(uint64(metricData.DataPointCount()))

// update prevErr to err if it's different than last observed error
if ps.prevErr == nil || ps.prevErr.Error() != err.Error() {
ps.prevErr = err
log.Printf("Cannot send metrics: %v", err)
}
break
return fmt.Errorf("cannot send metrics: %w", err)
}
}

func (ps *ProviderSender) generateLog() {
func (ps *ProviderSender) generateLog() error {
logSender := ps.Sender.(LogDataSender)

logData, done := ps.Provider.GenerateLogs()
if done {
return
return nil
}

for {
err := logSender.ConsumeLogs(context.Background(), logData)
if err == nil {
ps.prevErr = nil
break
return nil
}

if !consumererror.IsPermanent(err) {
Expand All @@ -318,12 +304,6 @@ func (ps *ProviderSender) generateLog() {
}

ps.permanentErrors.Add(uint64(logData.LogRecordCount()))

// update prevErr to err if it's different than last observed error
if ps.prevErr == nil || ps.prevErr.Error() != err.Error() {
ps.prevErr = err
log.Printf("Cannot send logs: %v", err)
}
break
return fmt.Errorf("cannot send logs: %w", err)
}
}

0 comments on commit 37f405b

Please sign in to comment.