Skip to content

Commit

Permalink
[chore] [exporter/splunkhec] Reuse jsoniter stream/buffer (open-telem…
Browse files Browse the repository at this point in the history
…etry#22016)

This improves performance by avoiding buffer allocation for every json encoded HEC event.
  • Loading branch information
dmitryax committed May 16, 2023
1 parent f5c1a88 commit 3a7c7f2
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 4 deletions.
8 changes: 7 additions & 1 deletion exporter/splunkhecexporter/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"errors"
"io"
"sync"

jsoniter "github.com/json-iterator/go"
)

var (
Expand All @@ -36,6 +38,7 @@ type bufferState struct {
maxEventLength uint
writer io.Writer
buf *bytes.Buffer
jsonStream *jsoniter.Stream
rawLength int
}

Expand Down Expand Up @@ -195,15 +198,18 @@ func (p bufferStatePool) put(bf *bufferState) {
p.pool.Put(bf)
}

const initBufferCap = 512

func newBufferStatePool(bufCap uint, compressionAvailable bool, maxEventLength uint) bufferStatePool {
return bufferStatePool{
&sync.Pool{
New: func() interface{} {
buf := new(bytes.Buffer)
buf := bytes.NewBuffer(make([]byte, 0, initBufferCap))
return &bufferState{
compressionAvailable: compressionAvailable,
writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap},
buf: buf,
jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap),
bufferMaxLen: bufCap,
maxEventLength: maxEventLength,
}
Expand Down
19 changes: 16 additions & 3 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,10 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (
} else {
// Parsing log record to Splunk event.
event := mapLogRecordToSplunkEvent(rl.Resource(), logRecord, c.config)

// JSON encoding event and writing to buffer.
var err error
b, err = jsoniter.Marshal(event)
b, err = marshalEvent(event, bs.jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"dropped log event: %v, error: %w", event, err)))
Expand Down Expand Up @@ -261,7 +262,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is
}
for _, event := range events {
// JSON encoding event and writing to buffer.
b, err := jsoniter.Marshal(event)
b, err := marshalEvent(event, bs.jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err)))
continue
Expand Down Expand Up @@ -305,8 +306,9 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter

// Parsing span record to Splunk event.
event := mapSpanToSplunkEvent(rs.Resource(), span, c.config)

// JSON encoding event and writing to buffer.
b, err := jsoniter.Marshal(event)
b, err := marshalEvent(event, bs.jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err)))
continue
Expand Down Expand Up @@ -597,3 +599,14 @@ func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]
"__splunk_app_version": config.SplunkAppVersion,
}
}

// marshalEvent marshals an event to JSON using a reusable jsoniter stream.
func marshalEvent(event *splunk.Event, stream *jsoniter.Stream) ([]byte, error) {
stream.Reset(nil)
stream.Error = nil
stream.WriteVal(event)
if stream.Error != nil {
return nil, stream.Error
}
return stream.Buffer(), nil
}
52 changes: 52 additions & 0 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net"
"net/http"
"net/url"
"os"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -866,6 +867,57 @@ func TestReceiveRaw(t *testing.T) {
}
}

func TestReceiveLogEvent(t *testing.T) {
logs := createLogData(1, 1, 1)
cfg := NewFactory().CreateDefaultConfig().(*Config)
cfg.DisableCompression = true

actual, err := runLogExport(cfg, logs, 1, t)
assert.Len(t, actual, 1)
assert.NoError(t, err)

compareWithTestData(t, actual[0].body, "testdata/hec_log_event.json")
}

func TestReceiveMetricEvent(t *testing.T) {
metrics := createMetricsData(1)
cfg := NewFactory().CreateDefaultConfig().(*Config)
cfg.DisableCompression = true

actual, err := runMetricsExport(cfg, metrics, 1, t)
assert.Len(t, actual, 1)
assert.NoError(t, err)

compareWithTestData(t, actual[0].body, "testdata/hec_metric_event.json")
}

func TestReceiveSpanEvent(t *testing.T) {
traces := createTraceData(1)
cfg := NewFactory().CreateDefaultConfig().(*Config)
cfg.DisableCompression = true

actual, err := runTraceExport(cfg, traces, 1, t)
assert.Len(t, actual, 1)
assert.NoError(t, err)

compareWithTestData(t, actual[0].body, "testdata/hec_span_event.json")
}

// compareWithTestData compares hec output with a json file using maps instead of strings to avoid key ordering
// issues (jsoniter doesn't sort the keys).
func compareWithTestData(t *testing.T, actual []byte, file string) {
wantStr, err := os.ReadFile(file)
require.NoError(t, err)
wantMap := map[string]any{}
err = jsoniter.Unmarshal(wantStr, &wantMap)
require.NoError(t, err)

gotMap := map[string]any{}
err = jsoniter.Unmarshal(actual, &gotMap)
require.NoError(t, err)
assert.Equal(t, wantMap, gotMap)
}

func TestReceiveMetrics(t *testing.T) {
md := createMetricsData(3)
cfg := NewFactory().CreateDefaultConfig().(*Config)
Expand Down
11 changes: 11 additions & 0 deletions exporter/splunkhecexporter/testdata/hec_log_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"host": "myhost",
"source": "myapp",
"sourcetype": "myapp-type",
"index": "myindex",
"event": "mylog",
"fields": {
"otel.log.name": "0_0_0",
"custom": "custom"
}
}
14 changes: 14 additions & 0 deletions exporter/splunkhecexporter/testdata/hec_metric_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"host": "unknown",
"event": "metric",
"fields": {
"k/n1": "vn1",
"k/r0": "vr0",
"k/r1": "vr1",
"metric_name:gauge_double_with_dims": 1234.5678,
"metric_type": "Gauge",
"k0": "v0",
"k1": "v1",
"k/n0": "vn0"
}
}
20 changes: 20 additions & 0 deletions exporter/splunkhecexporter/testdata/hec_span_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"time": 1,
"host": "unknown",
"event": {
"trace_id": "01010101010101010101010101010101",
"span_id": "0000000000000001",
"parent_span_id": "0102030405060708",
"name": "root",
"end_time": 2000000000,
"kind": "SPAN_KIND_UNSPECIFIED",
"status": {
"message": "ok",
"code": "STATUS_CODE_OK"
},
"start_time": 1000000000
},
"fields": {
"resource": "R1"
}
}

0 comments on commit 3a7c7f2

Please sign in to comment.