Skip to content

Commit

Permalink
Revert "What if we used zstd in our trace writers (#23806)" (#25278)
Browse files Browse the repository at this point in the history
This reverts commit e496f86.
  • Loading branch information
ajgajg1134 committed May 2, 2024
1 parent 65cc71c commit 32187be
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 98 deletions.
1 change: 0 additions & 1 deletion pkg/trace/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ require (
github.com/DataDog/datadog-go/v5 v5.5.0
github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.14.0
github.com/DataDog/sketches-go v1.4.2
github.com/DataDog/zstd v1.5.5
github.com/Microsoft/go-winio v0.6.1
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575
github.com/davecgh/go-spew v1.1.1
Expand Down
2 changes: 0 additions & 2 deletions pkg/trace/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 17 additions & 39 deletions pkg/trace/writer/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package writer
import (
"compress/gzip"
"errors"
"io"
"runtime"
"strings"
"sync"
Expand All @@ -22,7 +21,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/timing"

"github.com/DataDog/datadog-go/v5/statsd"
"github.com/DataDog/zstd"
)

// pathTraces is the target host API path for delivering traces.
Expand Down Expand Up @@ -71,7 +69,7 @@ type TraceWriter struct {
senders []*sender
stop chan struct{}
stats *info.TraceWriterInfo
wg sync.WaitGroup // waits for compressors
wg sync.WaitGroup // waits for gzippers
tick time.Duration // flush frequency
agentVersion string

Expand All @@ -87,7 +85,6 @@ type TraceWriter struct {
easylog *log.ThrottledLogger
statsd statsd.ClientInterface
timing timing.Reporter
useZstd bool
}

// NewTraceWriter returns a new TraceWriter. It is created for the given agent configuration and
Expand Down Expand Up @@ -118,7 +115,6 @@ func NewTraceWriter(
telemetryCollector: telemetryCollector,
statsd: statsd,
timing: timing,
useZstd: cfg.HasFeature("zstd-encoding"),
}
climit := cfg.TraceWriter.ConnectionLimit
if climit == 0 {
Expand Down Expand Up @@ -286,43 +282,25 @@ func (w *TraceWriter) serializer() {
}

w.stats.BytesUncompressed.Add(int64(len(b)))
var p *payload
var writer io.WriteCloser

if w.useZstd {
p = newPayload(map[string]string{
"Content-Type": "application/x-protobuf",
"Content-Encoding": "zstd",
headerLanguages: strings.Join(info.Languages(), "|"),
})

p.body.Grow(len(b) / 2)
writer = zstd.NewWriterLevel(p.body, zstd.BestSpeed)

} else {
p = newPayload(map[string]string{
"Content-Type": "application/x-protobuf",
"Content-Encoding": "gzip",
headerLanguages: strings.Join(info.Languages(), "|"),
})
p.body.Grow(len(b) / 2)

writer, err = gzip.NewWriterLevel(p.body, gzip.BestSpeed)
if err != nil {
// it will never happen, unless an invalid compression is chosen;
// we know gzip.BestSpeed is valid.
log.Errorf("Failed to compress trace paylod: %s", err)
return
}
p := newPayload(map[string]string{
"Content-Type": "application/x-protobuf",
"Content-Encoding": "gzip",
headerLanguages: strings.Join(info.Languages(), "|"),
})
p.body.Grow(len(b) / 2)
gzipw, err := gzip.NewWriterLevel(p.body, gzip.BestSpeed)
if err != nil {
// it will never happen, unless an invalid compression is chosen;
// we know gzip.BestSpeed is valid.
log.Errorf("gzip.NewWriterLevel: %d", err)
return
}

if _, err := writer.Write(b); err != nil {
log.Errorf("Error compressing trace payload: %v", err)
if _, err := gzipw.Write(b); err != nil {
log.Errorf("Error gzipping trace payload: %v", err)
}
if err := writer.Close(); err != nil {
log.Errorf("Error closing compressed stream when writing trace payload: %v", err)
if err := gzipw.Close(); err != nil {
log.Errorf("Error closing gzip stream when writing trace payload: %v", err)
}

sendPayloads(w.senders, p, w.syncMode)
}()
}
Expand Down
92 changes: 36 additions & 56 deletions pkg/trace/writer/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ package writer

import (
"compress/gzip"
"fmt"
"io"
"runtime"
"sync"
"testing"

"github.com/DataDog/zstd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -44,45 +42,38 @@ func (s MockSampler) GetTargetTPS() float64 {
var mockSampler = MockSampler{TargetTPS: 5, Enabled: true}

func TestTraceWriter(t *testing.T) {
testCases := []bool{false, true}
srv := newTestServer()
cfg := &config.AgentConfig{
Hostname: testHostname,
DefaultEnv: testEnv,
Endpoints: []*config.Endpoint{{
APIKey: "123",
Host: srv.URL,
}},
TraceWriter: &config.WriterConfig{ConnectionLimit: 200, QueueSize: 40},
}

for _, tc := range testCases {
srv := newTestServer()
cfg := &config.AgentConfig{
Hostname: testHostname,
DefaultEnv: testEnv,
Endpoints: []*config.Endpoint{{
APIKey: "123",
Host: srv.URL,
}},
TraceWriter: &config.WriterConfig{ConnectionLimit: 200, QueueSize: 40},
t.Run("ok", func(t *testing.T) {
testSpans := []*SampledChunks{
randomSampledSpans(20, 8),
randomSampledSpans(10, 0),
randomSampledSpans(40, 5),
}
if tc {
cfg.Features = map[string]struct{}{"zstd-encoding": {}}
// Use a flush threshold that allows the first two entries to not overflow,
// but overflow on the third.
defer useFlushThreshold(testSpans[0].Size + testSpans[1].Size + 10)()
tw := NewTraceWriter(cfg, mockSampler, mockSampler, mockSampler, telemetry.NewNoopCollector(), &statsd.NoOpClient{}, &timing.NoopReporter{})
tw.In = make(chan *SampledChunks)
go tw.Run()
for _, ss := range testSpans {
tw.In <- ss
}

t.Run(fmt.Sprintf("zstd_%t", tc), func(t *testing.T) {
testSpans := []*SampledChunks{
randomSampledSpans(20, 8),
randomSampledSpans(10, 0),
randomSampledSpans(40, 5),
}
// Use a flush threshold that allows the first two entries to not overflow,
// but overflow on the third.
defer useFlushThreshold(testSpans[0].Size + testSpans[1].Size + 10)()
tw := NewTraceWriter(cfg, mockSampler, mockSampler, mockSampler, telemetry.NewNoopCollector(), &statsd.NoOpClient{}, &timing.NoopReporter{})
tw.In = make(chan *SampledChunks)
go tw.Run()
for _, ss := range testSpans {
tw.In <- ss
}
tw.Stop()
// One payload flushes due to overflowing the threshold, and the second one
// because of stop.
assert.Equal(t, 2, srv.Accepted())
payloadsContain(t, srv.Payloads(), testSpans, tc)
})
}
tw.Stop()
// One payload flushes due to overflowing the threshold, and the second one
// because of stop.
assert.Equal(t, 2, srv.Accepted())
payloadsContain(t, srv.Payloads(), testSpans)
})
}

func TestTraceWriterMultipleEndpointsConcurrent(t *testing.T) {
Expand Down Expand Up @@ -131,7 +122,7 @@ func TestTraceWriterMultipleEndpointsConcurrent(t *testing.T) {

wg.Wait()
tw.Stop()
payloadsContain(t, srv.Payloads(), testSpans, false)
payloadsContain(t, srv.Payloads(), testSpans)
}

// useFlushThreshold sets n as the number of bytes to be used as the flush threshold
Expand All @@ -154,25 +145,14 @@ func randomSampledSpans(spans, events int) *SampledChunks {
}

// payloadsContain checks that the given payloads contain the given set of sampled spans.
func payloadsContain(t *testing.T, payloads []*payload, sampledSpans []*SampledChunks, shouldUseZstd bool) {
func payloadsContain(t *testing.T, payloads []*payload, sampledSpans []*SampledChunks) {
t.Helper()
var all pb.AgentPayload
for _, p := range payloads {
assert := assert.New(t)
var slurp []byte
var err error
var reader io.ReadCloser

if shouldUseZstd {
reader = zstd.NewReader(p.body)
assert.NotNil(reader)
} else {
reader, err = gzip.NewReader(p.body)
assert.NoError(err)
}

slurp, err = io.ReadAll(reader)

gzipr, err := gzip.NewReader(p.body)
assert.NoError(err)
slurp, err := io.ReadAll(gzipr)
assert.NoError(err)
var payload pb.AgentPayload
err = proto.Unmarshal(slurp, &payload)
Expand Down Expand Up @@ -227,7 +207,7 @@ func TestTraceWriterFlushSync(t *testing.T) {
tw.FlushSync()
// Now all trace payloads should be sent
assert.Equal(t, 1, srv.Accepted())
payloadsContain(t, srv.Payloads(), testSpans, false)
payloadsContain(t, srv.Payloads(), testSpans)
})
}

Expand Down Expand Up @@ -296,7 +276,7 @@ func TestTraceWriterSyncStop(t *testing.T) {
tw.Stop()
// Now all trace payloads should be sent
assert.Equal(t, 1, srv.Accepted())
payloadsContain(t, srv.Payloads(), testSpans, false)
payloadsContain(t, srv.Payloads(), testSpans)
})
}

Expand Down

0 comments on commit 32187be

Please sign in to comment.