Skip to content

Commit

Permalink
[exporter/splunkhec] apply multimetric metric merge for the whole bat…
Browse files Browse the repository at this point in the history
…ch (open-telemetry#23366)

Apply multi-metric merge at the level of the whole batch rather than within events emitted for one metric.
  • Loading branch information
atoulme authored and fchikwekwe committed Jun 23, 2023
1 parent fb48a31 commit f940b15
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 57 deletions.
20 changes: 20 additions & 0 deletions .chloggen/splunkhec-exporter-multimetric-batch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Apply multi-metric merge at the level of the whole batch rather than within events emitted for one metric.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23365]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
95 changes: 86 additions & 9 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (c *client) pushMetricsData(
}
}

if c.config.UseMultiMetricFormat {
return c.pushMultiMetricsDataInBatches(ctx, md, localHeaders)
}
return c.pushMetricsDataInBatches(ctx, md, localHeaders)
}

Expand Down Expand Up @@ -255,15 +258,6 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
// Parsing metric record to Splunk event.
events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)
tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
if c.config.UseMultiMetricFormat {
merged, err := mergeEventsToMultiMetricFormat(events)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error merging events: %w", err)))
} else {
events = merged
}
}
for _, event := range events {
// JSON encoding event and writing to buffer.
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
Expand Down Expand Up @@ -298,6 +292,43 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
return iterState{done: true}, permanentErrors
}

func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

for i := is.record; i < len(events); i++ {
event := events[i]
// JSON encoding event and writing to buffer.
b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if jsonErr != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr)))
continue
}
_, err := buf.Write(b)
if errors.Is(err, errOverCapacity) {
if !buf.Empty() {
return iterState{
record: i,
done: false,
}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
return iterState{
record: i + 1,
done: i+1 != len(events),
}, permanentErrors
} else if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error writing the event: %w", err)))
}
}

return iterState{done: true}, permanentErrors
}

func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
Expand Down Expand Up @@ -345,6 +376,51 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState
return iterState{done: true}, permanentErrors
}

// pushMultiMetricsDataInBatches sends batches of Splunk multi-metric events in JSON format.
// The batch content length is restricted to MaxContentLengthMetrics.
// md metrics are parsed to Splunk events.
func (c *client) pushMultiMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error {
buf := c.bufferPool.get()
defer c.bufferPool.put(buf)
is := iterState{}

var permanentErrors []error
var events []*splunk.Event
for i := 0; i < md.ResourceMetrics().Len(); i++ {
rm := md.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(j)
for k := 0; k < sm.Metrics().Len(); k++ {
metric := sm.Metrics().At(k)

// Parsing metric record to Splunk event.
events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...)
}
}
}

merged, err := mergeEventsToMultiMetricFormat(events)
if err != nil {
return consumererror.NewPermanent(fmt.Errorf("error merging events: %w", err))
}

for !is.done {
buf.Reset()

latestIterState, batchPermanentErrors := c.fillMetricsBufferMultiMetrics(merged, buf, is)
permanentErrors = append(permanentErrors, batchPermanentErrors...)
if !buf.Empty() {
if err := c.postEvents(ctx, buf, headers); err != nil {
return consumererror.NewMetrics(err, md)
}
}

is = latestIterState
}

return multierr.Combine(permanentErrors...)
}

// pushMetricsDataInBatches sends batches of Splunk events in JSON format.
// The batch content length is restricted to MaxContentLengthMetrics.
// md metrics are parsed to Splunk events.
Expand All @@ -363,6 +439,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric
return consumererror.NewMetrics(err, subMetrics(md, is))
}
}

is = latestIterState
}

Expand Down
Loading

0 comments on commit f940b15

Please sign in to comment.