diff --git a/exporter/splunkhecexporter/buffer.go b/exporter/splunkhecexporter/buffer.go index e12481ccd52d0..28eda90b7ba24 100644 --- a/exporter/splunkhecexporter/buffer.go +++ b/exporter/splunkhecexporter/buffer.go @@ -20,6 +20,8 @@ import ( "errors" "io" "sync" + + jsoniter "github.com/json-iterator/go" ) var ( @@ -36,6 +38,7 @@ type bufferState struct { maxEventLength uint writer io.Writer buf *bytes.Buffer + jsonStream *jsoniter.Stream rawLength int } @@ -195,15 +198,18 @@ func (p bufferStatePool) put(bf *bufferState) { p.pool.Put(bf) } +const initBufferCap = 512 + func newBufferStatePool(bufCap uint, compressionAvailable bool, maxEventLength uint) bufferStatePool { return bufferStatePool{ &sync.Pool{ New: func() interface{} { - buf := new(bytes.Buffer) + buf := bytes.NewBuffer(make([]byte, 0, initBufferCap)) return &bufferState{ compressionAvailable: compressionAvailable, writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap}, buf: buf, + jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap), bufferMaxLen: bufCap, maxEventLength: maxEventLength, } diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index aa40d12440323..1350b7353a358 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -204,9 +204,10 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) ( } else { // Parsing log record to Splunk event. event := mapLogRecordToSplunkEvent(rl.Resource(), logRecord, c.config) + // JSON encoding event and writing to buffer. var err error - b, err = jsoniter.Marshal(event) + b, err = marshalEvent(event, bs.jsonStream) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "dropped log event: %v, error: %w", event, err))) @@ -261,7 +262,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is } for _, event := range events { // JSON encoding event and writing to buffer. - b, err := jsoniter.Marshal(event) + b, err := marshalEvent(event, bs.jsonStream) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) continue @@ -305,8 +306,9 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter // Parsing span record to Splunk event. event := mapSpanToSplunkEvent(rs.Resource(), span, c.config) + // JSON encoding event and writing to buffer. - b, err := jsoniter.Marshal(event) + b, err := marshalEvent(event, bs.jsonStream) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) continue @@ -597,3 +599,14 @@ func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string] "__splunk_app_version": config.SplunkAppVersion, } } + +// marshalEvent marshals an event to JSON using a reusable jsoniter stream. +func marshalEvent(event *splunk.Event, stream *jsoniter.Stream) ([]byte, error) { + stream.Reset(nil) + stream.Error = nil + stream.WriteVal(event) + if stream.Error != nil { + return nil, stream.Error + } + return stream.Buffer(), nil +} diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 5b5b67d7cce57..9956fe189bb4b 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -25,6 +25,7 @@ import ( "net" "net/http" "net/url" + "os" "regexp" "sort" "strings" @@ -866,6 +867,57 @@ func TestReceiveRaw(t *testing.T) { } } +func TestReceiveLogEvent(t *testing.T) { + logs := createLogData(1, 1, 1) + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.DisableCompression = true + + actual, err := runLogExport(cfg, logs, 1, t) + assert.Len(t, actual, 1) + assert.NoError(t, err) + + compareWithTestData(t, actual[0].body, "testdata/hec_log_event.json") +} + +func TestReceiveMetricEvent(t *testing.T) { + metrics := createMetricsData(1) + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.DisableCompression = true + + actual, err := runMetricsExport(cfg, metrics, 1, t) + assert.Len(t, actual, 1) + assert.NoError(t, err) + + compareWithTestData(t, actual[0].body, "testdata/hec_metric_event.json") +} + +func TestReceiveSpanEvent(t *testing.T) { + traces := createTraceData(1) + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.DisableCompression = true + + actual, err := runTraceExport(cfg, traces, 1, t) + assert.Len(t, actual, 1) + assert.NoError(t, err) + + compareWithTestData(t, actual[0].body, "testdata/hec_span_event.json") +} + +// compareWithTestData compares hec output with a json file using maps instead of strings to avoid key ordering +// issues (jsoniter doesn't sort the keys). +func compareWithTestData(t *testing.T, actual []byte, file string) { + wantStr, err := os.ReadFile(file) + require.NoError(t, err) + wantMap := map[string]any{} + err = jsoniter.Unmarshal(wantStr, &wantMap) + require.NoError(t, err) + + gotMap := map[string]any{} + err = jsoniter.Unmarshal(actual, &gotMap) + require.NoError(t, err) + assert.Equal(t, wantMap, gotMap) +} + func TestReceiveMetrics(t *testing.T) { md := createMetricsData(3) cfg := NewFactory().CreateDefaultConfig().(*Config) diff --git a/exporter/splunkhecexporter/testdata/hec_log_event.json b/exporter/splunkhecexporter/testdata/hec_log_event.json new file mode 100644 index 0000000000000..bd64c54360a66 --- /dev/null +++ b/exporter/splunkhecexporter/testdata/hec_log_event.json @@ -0,0 +1,11 @@ +{ + "host": "myhost", + "source": "myapp", + "sourcetype": "myapp-type", + "index": "myindex", + "event": "mylog", + "fields": { + "otel.log.name": "0_0_0", + "custom": "custom" + } +} diff --git a/exporter/splunkhecexporter/testdata/hec_metric_event.json b/exporter/splunkhecexporter/testdata/hec_metric_event.json new file mode 100644 index 0000000000000..20858fa9fe331 --- /dev/null +++ b/exporter/splunkhecexporter/testdata/hec_metric_event.json @@ -0,0 +1,14 @@ +{ + "host": "unknown", + "event": "metric", + "fields": { + "k/n1": "vn1", + "k/r0": "vr0", + "k/r1": "vr1", + "metric_name:gauge_double_with_dims": 1234.5678, + "metric_type": "Gauge", + "k0": "v0", + "k1": "v1", + "k/n0": "vn0" + } +} diff --git a/exporter/splunkhecexporter/testdata/hec_span_event.json b/exporter/splunkhecexporter/testdata/hec_span_event.json new file mode 100644 index 0000000000000..dab1ab8094f90 --- /dev/null +++ b/exporter/splunkhecexporter/testdata/hec_span_event.json @@ -0,0 +1,20 @@ +{ + "time": 1, + "host": "unknown", + "event": { + "trace_id": "01010101010101010101010101010101", + "span_id": "0000000000000001", + "parent_span_id": "0102030405060708", + "name": "root", + "end_time": 2000000000, + "kind": "SPAN_KIND_UNSPECIFIED", + "status": { + "message": "ok", + "code": "STATUS_CODE_OK" + }, + "start_time": 1000000000 + }, + "fields": { + "resource": "R1" + } +}