From f8e184415d395f0025e14ba0667dbcb2932d622b Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Wed, 10 May 2023 15:51:14 -0700 Subject: [PATCH] [chore] [exporter/splunkhec] Remove redundant bufferState fields (#21774) This removes the need to maintain several fields bind to each other: compressionEnabled/writer and rawLength/containsData. This also removes bool fields scattered across the struct, reducing the overall size of it from 88 to 80 bytes. It doesn't significantly affect the performance though --- exporter/splunkhecexporter/buffer.go | 19 ++++++++++--------- exporter/splunkhecexporter/client.go | 14 +++++++------- exporter/splunkhecexporter/hec_worker.go | 2 +- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/exporter/splunkhecexporter/buffer.go b/exporter/splunkhecexporter/buffer.go index aa0f6c8d525a..84a325e2ad29 100644 --- a/exporter/splunkhecexporter/buffer.go +++ b/exporter/splunkhecexporter/buffer.go @@ -31,7 +31,6 @@ const minCompressionLen = 1500 // bufferState encapsulates intermediate buffer state when pushing data type bufferState struct { compressionAvailable bool - compressionEnabled bool bufferMaxLen uint maxEventLength uint writer io.Writer @@ -39,15 +38,21 @@ type bufferState struct { resource int // index in ResourceLogs/ResourceMetrics/ResourceSpans list library int // index in ScopeLogs/ScopeMetrics/ScopeSpans list record int // index in Logs/Metrics/Spans list - containsData bool rawLength int } +func (b *bufferState) compressionEnabled() bool { + _, ok := b.writer.(*cancellableGzipWriter) + return ok +} + +func (b *bufferState) containsData() bool { + return b.rawLength > 0 +} + func (b *bufferState) reset() { b.buf.Reset() - b.compressionEnabled = false b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen} - b.containsData = false b.rawLength = 0 } @@ -73,7 +78,7 @@ func (b *bufferState) accept(data []byte) (bool, error) { if overCapacity { bufLen += len(data) } - if b.compressionAvailable && !b.compressionEnabled && bufLen > minCompressionLen { + if b.compressionAvailable && !b.compressionEnabled() && bufLen > minCompressionLen { // switch over to a zip buffer. tmpBuf := bytes.NewBuffer(make([]byte, 0, b.bufferMaxLen+bufCapPadding)) writer := gzip.NewWriter(tmpBuf) @@ -95,7 +100,6 @@ func (b *bufferState) accept(data []byte) (bool, error) { } b.writer = zipWriter b.buf = tmpBuf - b.compressionEnabled = true // if the byte writer was over capacity, try to write the new entry in the zip writer: if overCapacity { if _, err2 := zipWriter.Write(data); err2 != nil { @@ -108,13 +112,11 @@ func (b *bufferState) accept(data []byte) (bool, error) { } b.rawLength += len(data) - b.containsData = true return true, nil } if overCapacity { return false, nil } - b.containsData = true b.rawLength += len(data) return true, err } @@ -184,7 +186,6 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool, maxEventLength return &bufferState{ compressionAvailable: compressionAvailable, - compressionEnabled: false, writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap}, buf: buf, bufferMaxLen: bufCap, diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 8b30070c6d50..6e6f890ab857 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -180,14 +180,14 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers } // There's some leftover unsent non-profiling data - if bufState != nil && bufState.containsData { + if bufState != nil && bufState.containsData() { if err := c.postEvents(ctx, bufState, headers); err != nil { return consumererror.NewLogs(err, c.subLogs(ld, bufState, profilingBufState)) } } // There's some leftover unsent profiling data - if profilingBufState != nil && profilingBufState.containsData { + if profilingBufState != nil && profilingBufState.containsData() { if err := c.postEvents(ctx, profilingBufState, profilingLocalHeaders); err != nil { // Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above. return consumererror.NewLogs(err, c.subLogs(ld, nil, profilingBufState)) @@ -231,7 +231,7 @@ func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice, continue } - if state.containsData { + if state.containsData() { if err := c.postEvents(ctx, state, headers); err != nil { return permanentErrors, err } @@ -297,7 +297,7 @@ func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMet continue } - if state.containsData { + if state.containsData() { if err := c.postEvents(ctx, state, headers); err != nil { return permanentErrors, err } @@ -350,7 +350,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli continue } - if state.containsData { + if state.containsData() { if err = c.postEvents(ctx, state, headers); err != nil { return permanentErrors, err } @@ -403,7 +403,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric } // There's some leftover unsent metrics - if bufState.containsData { + if bufState.containsData() { if err := c.postEvents(ctx, bufState, headers); err != nil { return consumererror.NewMetrics(err, subMetrics(md, bufState)) } @@ -438,7 +438,7 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, } // There's some leftover unsent traces - if bufState.containsData { + if bufState.containsData() { if err := c.postEvents(ctx, bufState, headers); err != nil { return consumererror.NewTraces(err, subTraces(td, bufState)) } diff --git a/exporter/splunkhecexporter/hec_worker.go b/exporter/splunkhecexporter/hec_worker.go index e34127663996..d242d0c2f83e 100644 --- a/exporter/splunkhecexporter/hec_worker.go +++ b/exporter/splunkhecexporter/hec_worker.go @@ -53,7 +53,7 @@ func (hec *defaultHecWorker) send(ctx context.Context, bufferState *bufferState, req.Header.Set(k, v) } - if bufferState.compressionEnabled { + if bufferState.compressionEnabled() { req.Header.Set("Content-Encoding", "gzip") }