Skip to content

Commit

Permalink
Fix panic when LogRecordCount is called after ConsumeLogs (open-telem…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Nov 14, 2023
1 parent f23afae commit 98124e0
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 10 deletions.
27 changes: 27 additions & 0 deletions .chloggen/consume-logs-race-fix-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where counting number of logs emitted could cause panic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27469, 29107]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/consume-logs-race-fix-3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: lokireceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where counting number of logs emitted could cause panic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27469, 29107]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/consume-logs-race-fix-4.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where counting number of logs emitted could cause panic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27469, 29107]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/consume-logs-race-fix-5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjectsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where counting number of logs emitted could cause panic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27469, 29107]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/consume-logs-race-fix-6.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fluentforwardreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where counting number of logs emitted could cause panic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27469, 29107]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions .chloggen/consume-logs-race-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otlpjsonfilereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where counting number of logs emitted could cause panic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27469, 29107]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 2 additions & 1 deletion pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ func (r *receiver) consumerLoop(ctx context.Context) {
continue
}
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", pLogs.LogRecordCount(), cErr)
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions receiver/fluentforwardreceiver/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ func (c *Collector) processEvents(ctx context.Context) {
// efficiency on LogResource allocations.
c.fillBufferUntilChanEmpty(logSlice)

stats.Record(context.Background(), observ.RecordsGenerated.M(int64(out.LogRecordCount())))
logRecordCount := out.LogRecordCount()
stats.Record(context.Background(), observ.RecordsGenerated.M(int64(logRecordCount)))
obsCtx := c.obsrecv.StartLogsOp(ctx)
err := c.nextConsumer.ConsumeLogs(obsCtx, out)
c.obsrecv.EndLogsOp(obsCtx, "fluent", out.LogRecordCount(), err)
c.obsrecv.EndLogsOp(obsCtx, "fluent", logRecordCount, err)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
} else if len(objects.Items) > 0 {
logs := pullObjectsToLogData(objects, time.Now(), config)
obsCtx := kr.obsrecv.StartLogsOp(ctx)
logRecordCount := logs.LogRecordCount()
err = kr.consumer.ConsumeLogs(obsCtx, logs)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, logs.LogRecordCount(), err)
kr.obsrecv.EndLogsOp(obsCtx, metadata.Type, logRecordCount, err)
}
case <-stopperChan:
return
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,9 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
return err
}
c.headerExtractor.extractHeadersLogs(logs, message)
logRecordCount := logs.LogRecordCount()
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err)
if err != nil {
if c.messageMarking.After && c.messageMarking.OnError {
session.MarkMessage(message, "")
Expand Down
6 changes: 4 additions & 2 deletions receiver/lokireceiver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func (r *lokiReceiver) Push(ctx context.Context, pushRequest *push.PushRequest)
return &push.PushResponse{}, err
}
ctx = r.obsrepGRPC.StartLogsOp(ctx)
logRecordCount := logs.LogRecordCount()
err = r.nextConsumer.ConsumeLogs(ctx, logs)
r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logs.LogRecordCount(), err)
r.obsrepGRPC.EndLogsOp(ctx, "protobuf", logRecordCount, err)
return &push.PushResponse{}, nil
}

Expand Down Expand Up @@ -218,8 +219,9 @@ func handleLogs(resp http.ResponseWriter, req *http.Request, r *lokiReceiver) {
return
}
ctx := r.obsrepHTTP.StartLogsOp(req.Context())
logRecordCount := logs.LogRecordCount()
err = r.nextConsumer.ConsumeLogs(ctx, logs)
r.obsrepHTTP.EndLogsOp(ctx, "json", logs.LogRecordCount(), err)
r.obsrepHTTP.EndLogsOp(ctx, "json", logRecordCount, err)

resp.WriteHeader(http.StatusNoContent)
}
5 changes: 3 additions & 2 deletions receiver/otlpjsonfilereceiver/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func createLogsReceiver(_ context.Context, settings receiver.CreateSettings, con
if err != nil {
obsrecv.EndLogsOp(ctx, metadata.Type, 0, err)
} else {
if l.ResourceLogs().Len() != 0 {
logRecordCount := l.LogRecordCount()
if logRecordCount != 0 {
err = logs.ConsumeLogs(ctx, l)
}
obsrecv.EndLogsOp(ctx, metadata.Type, l.LogRecordCount(), err)
obsrecv.EndLogsOp(ctx, metadata.Type, logRecordCount, err)
}
return nil
})
Expand Down

0 comments on commit 98124e0

Please sign in to comment.