Skip to content

Commit

Permalink
[chore] [exporter/splunkhec] Remove redundant bufferState fields (ope…
Browse files Browse the repository at this point in the history
…n-telemetry#21774)

This removes the need to maintain several fields bind to each other: compressionEnabled/writer and rawLength/containsData. This also removes bool fields scattered across the struct, reducing the overall size of it from 88 to 80 bytes. It doesn't significantly affect the performance though
  • Loading branch information
dmitryax committed May 10, 2023
1 parent 7ecc63c commit f8e1844
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 17 deletions.
19 changes: 10 additions & 9 deletions exporter/splunkhecexporter/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,28 @@ const minCompressionLen = 1500
// bufferState encapsulates intermediate buffer state when pushing data
type bufferState struct {
compressionAvailable bool
compressionEnabled bool
bufferMaxLen uint
maxEventLength uint
writer io.Writer
buf *bytes.Buffer
resource int // index in ResourceLogs/ResourceMetrics/ResourceSpans list
library int // index in ScopeLogs/ScopeMetrics/ScopeSpans list
record int // index in Logs/Metrics/Spans list
containsData bool
rawLength int
}

func (b *bufferState) compressionEnabled() bool {
_, ok := b.writer.(*cancellableGzipWriter)
return ok
}

func (b *bufferState) containsData() bool {
return b.rawLength > 0
}

func (b *bufferState) reset() {
b.buf.Reset()
b.compressionEnabled = false
b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen}
b.containsData = false
b.rawLength = 0
}

Expand All @@ -73,7 +78,7 @@ func (b *bufferState) accept(data []byte) (bool, error) {
if overCapacity {
bufLen += len(data)
}
if b.compressionAvailable && !b.compressionEnabled && bufLen > minCompressionLen {
if b.compressionAvailable && !b.compressionEnabled() && bufLen > minCompressionLen {
// switch over to a zip buffer.
tmpBuf := bytes.NewBuffer(make([]byte, 0, b.bufferMaxLen+bufCapPadding))
writer := gzip.NewWriter(tmpBuf)
Expand All @@ -95,7 +100,6 @@ func (b *bufferState) accept(data []byte) (bool, error) {
}
b.writer = zipWriter
b.buf = tmpBuf
b.compressionEnabled = true
// if the byte writer was over capacity, try to write the new entry in the zip writer:
if overCapacity {
if _, err2 := zipWriter.Write(data); err2 != nil {
Expand All @@ -108,13 +112,11 @@ func (b *bufferState) accept(data []byte) (bool, error) {

}
b.rawLength += len(data)
b.containsData = true
return true, nil
}
if overCapacity {
return false, nil
}
b.containsData = true
b.rawLength += len(data)
return true, err
}
Expand Down Expand Up @@ -184,7 +186,6 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool, maxEventLength

return &bufferState{
compressionAvailable: compressionAvailable,
compressionEnabled: false,
writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap},
buf: buf,
bufferMaxLen: bufCap,
Expand Down
14 changes: 7 additions & 7 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers
}

// There's some leftover unsent non-profiling data
if bufState != nil && bufState.containsData {
if bufState != nil && bufState.containsData() {
if err := c.postEvents(ctx, bufState, headers); err != nil {
return consumererror.NewLogs(err, c.subLogs(ld, bufState, profilingBufState))
}
}

// There's some leftover unsent profiling data
if profilingBufState != nil && profilingBufState.containsData {
if profilingBufState != nil && profilingBufState.containsData() {
if err := c.postEvents(ctx, profilingBufState, profilingLocalHeaders); err != nil {
// Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above.
return consumererror.NewLogs(err, c.subLogs(ld, nil, profilingBufState))
Expand Down Expand Up @@ -231,7 +231,7 @@ func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice,
continue
}

if state.containsData {
if state.containsData() {
if err := c.postEvents(ctx, state, headers); err != nil {
return permanentErrors, err
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMet
continue
}

if state.containsData {
if state.containsData() {
if err := c.postEvents(ctx, state, headers); err != nil {
return permanentErrors, err
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli
continue
}

if state.containsData {
if state.containsData() {
if err = c.postEvents(ctx, state, headers); err != nil {
return permanentErrors, err
}
Expand Down Expand Up @@ -403,7 +403,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric
}

// There's some leftover unsent metrics
if bufState.containsData {
if bufState.containsData() {
if err := c.postEvents(ctx, bufState, headers); err != nil {
return consumererror.NewMetrics(err, subMetrics(md, bufState))
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces,
}

// There's some leftover unsent traces
if bufState.containsData {
if bufState.containsData() {
if err := c.postEvents(ctx, bufState, headers); err != nil {
return consumererror.NewTraces(err, subTraces(td, bufState))
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/splunkhecexporter/hec_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (hec *defaultHecWorker) send(ctx context.Context, bufferState *bufferState,
req.Header.Set(k, v)
}

if bufferState.compressionEnabled {
if bufferState.compressionEnabled() {
req.Header.Set("Content-Encoding", "gzip")
}

Expand Down

0 comments on commit f8e1844

Please sign in to comment.