diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 6dba421594d6..c57500b25524 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -184,8 +184,10 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) ( for i := is.resource; i < logs.ResourceLogs().Len(); i++ { rl := logs.ResourceLogs().At(i) for j := is.library; j < rl.ScopeLogs().Len(); j++ { + is.library = 0 // Reset library index for next resource. sl := rl.ScopeLogs().At(j) for k := is.record; k < sl.LogRecords().Len(); k++ { + is.record = 0 // Reset record index for next library. logRecord := sl.LogRecords().At(k) if c.config.ExportRaw { @@ -233,8 +235,10 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) for j := is.library; j < rm.ScopeMetrics().Len(); j++ { + is.library = 0 // Reset library index for next resource. sm := rm.ScopeMetrics().At(j) for k := is.record; k < sm.Metrics().Len(); k++ { + is.record = 0 // Reset record index for next library. metric := sm.Metrics().At(k) // Parsing metric record to Splunk event. @@ -289,8 +293,10 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter for i := is.resource; i < traces.ResourceSpans().Len(); i++ { rs := traces.ResourceSpans().At(i) for j := is.library; j < rs.ScopeSpans().Len(); j++ { + is.library = 0 // Reset library index for next resource. ss := rs.ScopeSpans().At(j) for k := is.record; k < ss.Spans().Len(); k++ { + is.record = 0 // Reset record index for next library. span := ss.Spans().At(k) // Parsing span record to Splunk event. diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 195f585e2248..b538b58aaf46 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -72,50 +72,55 @@ func newTestClientWithPresetResponses(codes []int, bodies []string) (*http.Clien }, &headers } -func createMetricsData(numberOfDataPoints int) pmetric.Metrics { - +func createMetricsData(resourcesNum, dataPointsNum int) pmetric.Metrics { doubleVal := 1234.5678 metrics := pmetric.NewMetrics() - rm := metrics.ResourceMetrics().AppendEmpty() - rm.Resource().Attributes().PutStr("k0", "v0") - rm.Resource().Attributes().PutStr("k1", "v1") - - for i := 0; i < numberOfDataPoints; i++ { - tsUnix := time.Unix(int64(i), int64(i)*time.Millisecond.Nanoseconds()) - - ilm := rm.ScopeMetrics().AppendEmpty() - metric := ilm.Metrics().AppendEmpty() - metric.SetName("gauge_double_with_dims") - doublePt := metric.SetEmptyGauge().DataPoints().AppendEmpty() - doublePt.SetTimestamp(pcommon.NewTimestampFromTime(tsUnix)) - doublePt.SetDoubleValue(doubleVal) - doublePt.Attributes().PutStr("k/n0", "vn0") - doublePt.Attributes().PutStr("k/n1", "vn1") - doublePt.Attributes().PutStr("k/r0", "vr0") - doublePt.Attributes().PutStr("k/r1", "vr1") + + for i := 0; i < resourcesNum; i++ { + rm := metrics.ResourceMetrics().AppendEmpty() + rm.Resource().Attributes().PutStr("k0", fmt.Sprintf("v%d", i)) + rm.Resource().Attributes().PutStr("k1", "v1") + for j := 0; j < dataPointsNum; j++ { + count := i*dataPointsNum + j + tsUnix := time.Unix(int64(count), int64(count)*time.Millisecond.Nanoseconds()) + ilm := rm.ScopeMetrics().AppendEmpty() + metric := ilm.Metrics().AppendEmpty() + metric.SetName("gauge_double_with_dims") + doublePt := metric.SetEmptyGauge().DataPoints().AppendEmpty() + doublePt.SetTimestamp(pcommon.NewTimestampFromTime(tsUnix)) + doublePt.SetDoubleValue(doubleVal) + doublePt.Attributes().PutStr("k/n0", "vn0") + doublePt.Attributes().PutStr("k/n1", "vn1") + doublePt.Attributes().PutStr("k/r0", "vr0") + doublePt.Attributes().PutStr("k/r1", "vr1") + } } return metrics } -func createTraceData(numberOfTraces int) ptrace.Traces { +func createTraceData(resourcesNum int, spansNum int) ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() - rs.Resource().Attributes().PutStr("resource", "R1") - ils := rs.ScopeSpans().AppendEmpty() - ils.Spans().EnsureCapacity(numberOfTraces) - for i := 0; i < numberOfTraces; i++ { - span := ils.Spans().AppendEmpty() - span.SetName("root") - span.SetStartTimestamp(pcommon.Timestamp((i + 1) * 1e9)) - span.SetEndTimestamp(pcommon.Timestamp((i + 2) * 1e9)) - span.SetTraceID([16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) - span.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}) - span.TraceState().FromRaw("foo") - if i%2 == 0 { - span.SetParentSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) - span.Status().SetCode(ptrace.StatusCodeOk) - span.Status().SetMessage("ok") + + for i := 0; i < resourcesNum; i++ { + rs.Resource().Attributes().PutStr("resource", fmt.Sprintf("R%d", i)) + ils := rs.ScopeSpans().AppendEmpty() + ils.Spans().EnsureCapacity(spansNum) + for j := 0; j < spansNum; j++ { + span := ils.Spans().AppendEmpty() + span.SetName("root") + count := i*spansNum + j + span.SetStartTimestamp(pcommon.Timestamp((count + 1) * 1e9)) + span.SetEndTimestamp(pcommon.Timestamp((count + 2) * 1e9)) + span.SetTraceID([16]byte{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}) + span.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}) + span.TraceState().FromRaw("foo") + if count%2 == 0 { + span.SetParentSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + span.Status().SetCode(ptrace.StatusCodeOk) + span.Status().SetMessage("ok") + } } } @@ -387,7 +392,7 @@ func TestReceiveTracesBatches(t *testing.T) { }{ { name: "all trace events in payload when max content length unknown (configured max content length 0)", - traces: createTraceData(4), + traces: createTraceData(1, 4), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = 0 @@ -405,7 +410,7 @@ func TestReceiveTracesBatches(t *testing.T) { }, { name: "1 trace event per payload (configured max content length is same as event size)", - traces: createTraceData(4), + traces: createTraceData(1, 4), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = 320 @@ -423,7 +428,7 @@ func TestReceiveTracesBatches(t *testing.T) { }, { name: "2 trace events per payload (configured max content length is twice event size)", - traces: createTraceData(4), + traces: createTraceData(1, 4), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = 640 @@ -439,7 +444,7 @@ func TestReceiveTracesBatches(t *testing.T) { }, { name: "1 compressed batch of 2037 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression", - traces: createTraceData(10), + traces: createTraceData(1, 10), conf: func() *Config { return NewFactory().CreateDefaultConfig().(*Config) }(), @@ -453,7 +458,7 @@ func TestReceiveTracesBatches(t *testing.T) { }, { name: "100 events, make sure that we produce more than one compressed batch", - traces: createTraceData(100), + traces: createTraceData(1, 100), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = minCompressionLen + 500 @@ -465,9 +470,10 @@ func TestReceiveTracesBatches(t *testing.T) { numBatches: 2, compressed: true, }, - }, { + }, + { name: "100 events, make sure that we produce only one compressed batch when MaxContentLengthTraces is 0", - traces: createTraceData(100), + traces: createTraceData(1, 100), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = 0 @@ -481,6 +487,19 @@ func TestReceiveTracesBatches(t *testing.T) { compressed: true, }, }, + { + name: "10 resources, 10 spans, no compression", + traces: createTraceData(10, 10), + conf: func() *Config { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.DisableCompression = true + cfg.MaxContentLengthTraces = 5000 + return cfg + }(), + want: wantType{ + numBatches: 7, + }, + }, } for _, test := range tests { @@ -490,19 +509,42 @@ func TestReceiveTracesBatches(t *testing.T) { require.NoError(t, err) require.Len(t, got, test.want.numBatches, "expected exact number of batches") - for i := 0; i < test.want.numBatches; i++ { + for i, batch := range test.want.batches { require.NotZero(t, got[i]) if test.conf.MaxContentLengthTraces != 0 { require.True(t, int(test.conf.MaxContentLengthTraces) > len(got[i].body)) } if test.want.compressed { - validateCompressedContains(t, test.want.batches[i], got[i].body) + validateCompressedContains(t, batch, got[i].body) } else { - for _, expected := range test.want.batches[i] { + for _, expected := range batch { assert.Contains(t, string(got[i].body), expected) } } } + + // ensure all events are sent out + for i := 1; i < test.traces.SpanCount(); i++ { + eventFound := false + for _, batch := range got { + batchBody := batch.body + if test.want.compressed { + z, err := gzip.NewReader(bytes.NewReader(batchBody)) + require.NoError(t, err) + batchBody, err = io.ReadAll(z) + z.Close() + require.NoError(t, err) + } + timeStr := fmt.Sprintf(`"time":%d,`, i+1) + if strings.Contains(string(batchBody), timeStr) { + if eventFound { + t.Errorf("span event %d found in multiple batches", i) + } + eventFound = true + } + } + assert.Truef(t, eventFound, "span event %d not found in any batch", i) + } }) } } @@ -513,6 +555,7 @@ func TestReceiveLogs(t *testing.T) { numBatches int compressed bool wantErr string + wantDrops int // expected number of dropped events } // The test cases depend on the constant minCompressionLen = 1500. @@ -704,6 +747,7 @@ func TestReceiveLogs(t *testing.T) { {`"otel.log.name":"0_0_0"`, `"otel.log.name":"0_0_1"`}, }, numBatches: 1, + wantDrops: 1, }, }, { @@ -732,32 +776,80 @@ func TestReceiveLogs(t *testing.T) { numBatches: 3, }, }, + { + name: "10 resource logs, 1 scope logs, 10 log records, no compression", + logs: createLogData(10, 1, 10), + conf: func() *Config { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.DisableCompression = true + cfg.MaxContentLengthLogs = 5000 + return cfg + }(), + want: wantType{ + numBatches: 4, + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { got, err := runLogExport(test.conf, test.logs, test.want.numBatches, t) - if test.want.wantErr != "" { require.EqualError(t, err, test.want.wantErr) - } else { - require.NoError(t, err) + return } + require.NoError(t, err) require.Equal(t, test.want.numBatches, len(got)) - for i := 0; i < test.want.numBatches; i++ { + for i, wantBatch := range test.want.batches { require.NotZero(t, got[i]) if test.conf.MaxContentLengthLogs != 0 { require.True(t, int(test.conf.MaxContentLengthLogs) > len(got[i].body)) } if test.want.compressed { - validateCompressedContains(t, test.want.batches[i], got[i].body) + validateCompressedContains(t, wantBatch, got[i].body) } else { - for _, expected := range test.want.batches[i] { + for _, expected := range wantBatch { assert.Contains(t, string(got[i].body), expected) } } } + + // ensure all events are sent out + droppedCount := test.logs.LogRecordCount() + for i := 0; i < test.logs.ResourceLogs().Len(); i++ { + rl := test.logs.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + for k := 0; k < sl.LogRecords().Len(); k++ { + lr := sl.LogRecords().At(k) + attrVal, ok := lr.Attributes().Get("otel.log.name") + require.True(t, ok) + eventFound := false + for _, batch := range got { + batchBody := batch.body + if test.want.compressed { + z, err := gzip.NewReader(bytes.NewReader(batchBody)) + require.NoError(t, err) + batchBody, err = io.ReadAll(z) + z.Close() + require.NoError(t, err) + } + if strings.Contains(string(batchBody), fmt.Sprintf(`"%s"`, attrVal.Str())) { + if eventFound { + t.Errorf("log event %s found in multiple batches", attrVal.Str()) + } + eventFound = true + droppedCount-- + } + } + if test.want.wantDrops == 0 { + assert.Truef(t, eventFound, "log event %s not found in any batch", attrVal.Str()) + } + } + } + } + assert.Equal(t, test.want.wantDrops, droppedCount, "expected %d dropped events, got %d", test.want.wantDrops, droppedCount) }) } } @@ -869,7 +961,7 @@ func TestReceiveLogEvent(t *testing.T) { } func TestReceiveMetricEvent(t *testing.T) { - metrics := createMetricsData(1) + metrics := createMetricsData(1, 1) cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.DisableCompression = true @@ -881,7 +973,7 @@ func TestReceiveMetricEvent(t *testing.T) { } func TestReceiveSpanEvent(t *testing.T) { - traces := createTraceData(1) + traces := createTraceData(1, 1) cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.DisableCompression = true @@ -908,7 +1000,7 @@ func compareWithTestData(t *testing.T, actual []byte, file string) { } func TestReceiveMetrics(t *testing.T) { - md := createMetricsData(3) + md := createMetricsData(1, 3) cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.DisableCompression = true actual, err := runMetricsExport(cfg, md, 1, t) @@ -939,7 +1031,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { }{ { name: "all metrics events in payload when max content length unknown (configured max content length 0)", - metrics: createMetricsData(4), + metrics: createMetricsData(1, 4), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 0 @@ -954,7 +1046,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { }, { name: "1 metric event per payload (configured max content length is same as event size)", - metrics: createMetricsData(4), + metrics: createMetricsData(1, 4), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 300 @@ -972,7 +1064,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { }, { name: "2 metric events per payload (configured max content length is twice event size)", - metrics: createMetricsData(4), + metrics: createMetricsData(1, 4), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 448 @@ -988,7 +1080,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { }, { name: "1 compressed batch of 2037 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression", - metrics: createMetricsData(10), + metrics: createMetricsData(1, 10), conf: func() *Config { return NewFactory().CreateDefaultConfig().(*Config) }(), @@ -1002,7 +1094,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { }, { name: "200 events, make sure that we produce more than one compressed batch", - metrics: createMetricsData(100), + metrics: createMetricsData(1, 100), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = minCompressionLen + 150 @@ -1019,7 +1111,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { }, { name: "200 events, make sure that we produce only one compressed batch when MaxContentLengthMetrics is 0", - metrics: createMetricsData(100), + metrics: createMetricsData(1, 100), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 0 @@ -1033,6 +1125,19 @@ func TestReceiveBatchedMetrics(t *testing.T) { compressed: true, }, }, + { + name: "10 resources, 10 datapoints, no compression", + metrics: createMetricsData(10, 10), + conf: func() *Config { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.DisableCompression = true + cfg.MaxContentLengthMetrics = 5000 + return cfg + }(), + want: wantType{ + numBatches: 5, + }, + }, } for _, test := range tests { @@ -1042,17 +1147,17 @@ func TestReceiveBatchedMetrics(t *testing.T) { require.NoError(t, err) require.Len(t, got, test.want.numBatches) - for i := 0; i < test.want.numBatches; i++ { + for i, batch := range test.want.batches { require.NotZero(t, got[i]) if test.conf.MaxContentLengthMetrics != 0 { require.True(t, int(test.conf.MaxContentLengthMetrics) > len(got[i].body)) } if test.want.compressed { - validateCompressedContains(t, test.want.batches[i], got[i].body) + validateCompressedContains(t, batch, got[i].body) } else { found := false - for _, expected := range test.want.batches[i] { + for _, expected := range batch { if strings.Contains(string(got[i].body), expected) { found = true break @@ -1061,6 +1166,29 @@ func TestReceiveBatchedMetrics(t *testing.T) { assert.True(t, found, "%s did not match any expected batch", string(got[i].body)) } } + + // ensure all events are sent out + for i := 1; i < test.metrics.MetricCount(); i++ { + eventFound := false + for _, batch := range got { + batchBody := batch.body + if test.want.compressed { + z, err := gzip.NewReader(bytes.NewReader(batchBody)) + require.NoError(t, err) + batchBody, err = io.ReadAll(z) + z.Close() + require.NoError(t, err) + } + time := float64(i) + 0.001*float64(i) + if strings.Contains(string(batchBody), fmt.Sprintf(`"time":%g`, time)) { + if eventFound { + t.Errorf("metric event %d found in multiple batches", i) + } + eventFound = true + } + } + assert.Truef(t, eventFound, "metric event %d not found in any batch", i) + } }) } } @@ -1100,7 +1228,7 @@ func Test_PushMetricsData_Summary_NaN_Sum(t *testing.T) { func TestReceiveMetricsWithCompression(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 1800 - request, err := runMetricsExport(cfg, createMetricsData(100), 1, t) + request, err := runMetricsExport(cfg, createMetricsData(1, 100), 1, t) assert.NoError(t, err) assert.Equal(t, "gzip", request[0].headers.Get("Content-Encoding")) assert.NotEqual(t, "", request) @@ -1143,7 +1271,7 @@ func TestErrorReceived(t *testing.T) { assert.NoError(t, exporter.Shutdown(context.Background())) }() - td := createTraceData(3) + td := createTraceData(1, 3) err = exporter.ConsumeTraces(context.Background(), td) select { @@ -1184,7 +1312,7 @@ func TestInvalidURL(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(context.Background())) }() - td := createTraceData(2) + td := createTraceData(1, 2) err = exporter.ConsumeTraces(context.Background(), td) assert.EqualError(t, err, "Post \"ftp://example.com:134/services/collector\": unsupported protocol scheme \"ftp\"") diff --git a/exporter/splunkhecexporter/testdata/hec_span_event.json b/exporter/splunkhecexporter/testdata/hec_span_event.json index dab1ab8094f9..7085955ec27a 100644 --- a/exporter/splunkhecexporter/testdata/hec_span_event.json +++ b/exporter/splunkhecexporter/testdata/hec_span_event.json @@ -15,6 +15,6 @@ "start_time": 1000000000 }, "fields": { - "resource": "R1" + "resource": "R0" } }