Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/splunk_hec] Apply compression if it's enabled in the config #22969

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/splunk-hec-apply-compression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/splunk_hec

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Apply compression to Splunk HEC payload unconditionally if it's enabled in the config.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [22969, 22018]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The compression used to be enabled only if the payload size was greater than 1.5KB which significantly
complicated the logic and made it hard to test. This change makes the compression unconditionally applied to
the payload if it's enabled in the config. The benchmarking shows improvements in the throughput and CPU usage for
large payloads and expected degradation for small payloads which is acceptable given that it's not a common case.

136 changes: 67 additions & 69 deletions exporter/splunkhecexporter/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ var (
errOverCapacity = errors.New("over capacity")
)

// Minimum number of bytes to compress. 1500 is the MTU of an ethernet frame.
const minCompressionLen = 1500

// bufferState encapsulates intermediate buffer state when pushing data
type bufferState struct {
compressionAvailable bool
bufferMaxLen uint
maxEventLength uint
writer io.Writer
buf *bytes.Buffer
jsonStream *jsoniter.Stream
rawLength int
maxEventLength uint
buf buffer
jsonStream *jsoniter.Stream
rawLength int
}

type buffer interface {
io.Writer
io.Reader
io.Closer
Reset()
Len() int
}

func (b *bufferState) compressionEnabled() bool {
_, ok := b.writer.(*cancellableGzipWriter)
_, ok := b.buf.(*cancellableGzipWriter)
return ok
}

Expand All @@ -42,9 +44,6 @@ func (b *bufferState) containsData() bool {

func (b *bufferState) reset() {
b.buf.Reset()
if _, ok := b.writer.(*cancellableBytesWriter); !ok {
b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen}
}
b.rawLength = 0
}

Expand All @@ -53,64 +52,23 @@ func (b *bufferState) Read(p []byte) (n int, err error) {
}

func (b *bufferState) Close() error {
if _, ok := b.writer.(*cancellableGzipWriter); ok {
return b.writer.(*cancellableGzipWriter).close()
}
return nil
return b.buf.Close()
}

// accept returns true if data is accepted by the buffer
func (b *bufferState) accept(data []byte) (bool, error) {
if len(data)+b.rawLength > int(b.maxEventLength) {
return false, nil
}
_, err := b.writer.Write(data)
overCapacity := errors.Is(err, errOverCapacity)
bufLen := b.buf.Len()
if overCapacity {
bufLen += len(data)
}
if b.compressionAvailable && !b.compressionEnabled() && bufLen > minCompressionLen {
// switch over to a zip buffer.
tmpBuf := bytes.NewBuffer(make([]byte, 0, b.bufferMaxLen+bufCapPadding))
writer := gzip.NewWriter(tmpBuf)
writer.Reset(tmpBuf)
zipWriter := &cancellableGzipWriter{
innerBuffer: tmpBuf,
innerWriter: writer,
// 8 bytes required for the zip footer.
maxCapacity: b.bufferMaxLen - 8,
}

if b.bufferMaxLen == 0 {
zipWriter.maxCapacity = 0
}

// we write the bytes buffer into the zip buffer. Any error from this is I/O, and should stop the process.
if _, err2 := zipWriter.Write(b.buf.Bytes()); err2 != nil {
return false, err2
}
b.writer = zipWriter
b.buf = tmpBuf
// if the byte writer was over capacity, try to write the new entry in the zip writer:
if overCapacity {
if _, err2 := zipWriter.Write(data); err2 != nil {
overCapacity2 := errors.Is(err2, errOverCapacity)
if overCapacity2 {
return false, nil
}
return false, err2
}

}
_, err := b.buf.Write(data)
if err == nil {
b.rawLength += len(data)
return true, nil
}
if overCapacity {
if errors.Is(err, errOverCapacity) {
return false, nil
}
b.rawLength += len(data)
return true, err
return false, err
}

type cancellableBytesWriter struct {
Expand All @@ -128,6 +86,22 @@ func (c *cancellableBytesWriter) Write(b []byte) (int, error) {
return c.innerWriter.Write(b)
}

func (c *cancellableBytesWriter) Read(p []byte) (int, error) {
return c.innerWriter.Read(p)
}

func (c *cancellableBytesWriter) Reset() {
c.innerWriter.Reset()
}

func (c *cancellableBytesWriter) Close() error {
return nil
}

func (c *cancellableBytesWriter) Len() int {
return c.innerWriter.Len()
}

type cancellableGzipWriter struct {
innerBuffer *bytes.Buffer
innerWriter *gzip.Writer
Expand Down Expand Up @@ -168,10 +142,24 @@ func (c *cancellableGzipWriter) Write(b []byte) (int, error) {
return c.innerWriter.Write(b)
}

func (c *cancellableGzipWriter) close() error {
func (c *cancellableGzipWriter) Read(p []byte) (int, error) {
return c.innerBuffer.Read(p)
}

func (c *cancellableGzipWriter) Reset() {
c.innerBuffer.Reset()
c.innerWriter.Reset(c.innerBuffer)
c.len = 0
}

func (c *cancellableGzipWriter) Close() error {
return c.innerWriter.Close()
}

func (c *cancellableGzipWriter) Len() int {
return c.innerBuffer.Len()
}

// bufferStatePool is a pool of bufferState objects.
type bufferStatePool struct {
pool *sync.Pool
Expand All @@ -189,18 +177,28 @@ func (p bufferStatePool) put(bf *bufferState) {

const initBufferCap = 512

func newBufferStatePool(bufCap uint, compressionAvailable bool, maxEventLength uint) bufferStatePool {
func newBufferStatePool(bufCap uint, compressionEnabled bool, maxEventLength uint) bufferStatePool {
return bufferStatePool{
&sync.Pool{
New: func() interface{} {
buf := bytes.NewBuffer(make([]byte, 0, initBufferCap))
innerBuffer := bytes.NewBuffer(make([]byte, 0, initBufferCap))
var buf buffer
if compressionEnabled {
buf = &cancellableGzipWriter{
innerBuffer: innerBuffer,
innerWriter: gzip.NewWriter(buf),
maxCapacity: bufCap,
}
} else {
buf = &cancellableBytesWriter{
innerWriter: innerBuffer,
maxCapacity: bufCap,
}
}
return &bufferState{
compressionAvailable: compressionAvailable,
writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap},
buf: buf,
jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap),
bufferMaxLen: bufCap,
maxEventLength: maxEventLength,
buf: buf,
jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap),
maxEventLength: maxEventLength,
}
},
},
Expand Down
8 changes: 4 additions & 4 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), bs.bufferMaxLen)))
" content length %d bytes", len(b), c.config.MaxContentLengthLogs)))
return iterState{i, j, k + 1, false}, permanentErrors
}
}
Expand Down Expand Up @@ -277,7 +277,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), bs.bufferMaxLen)))
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
return iterState{i, j, k + 1, false}, permanentErrors
}
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), bs.bufferMaxLen)))
" content length %d bytes", len(b), c.config.MaxContentLengthTraces)))
return iterState{i, j, k + 1, false}, permanentErrors
}
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces,
}

func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers map[string]string) error {
if err := bufState.Close(); err != nil {
if err := bufState.buf.Close(); err != nil {
return err
}
return c.hecWorker.send(ctx, bufState, headers)
Expand Down
Loading