Skip to content

Commit

Permalink
feat(influxdbexporter): limit size of payload (open-telemetry#24001)
Browse files Browse the repository at this point in the history
InfluxDB often responds to exporter write requests with `413 Request
Entity Too Large` because the line protocol payload is too large. (The
otelcol batch processor can be used to limit batch size, but not in
terms of bytes out put to InfluxDB.)

To fix this, the influxdbexporter should self-limit based on both bytes
and lines of line protocol, defaulting to [the documented suggested
limits](https://docs.influxdata.com/influxdb/cloud-serverless/write-data/best-practices/optimize-writes/#batch-writes)
of 10,000 lines and 10MB. The service hard limit is 50MB [as documented
here](https://docs.influxdata.com/influxdb/cloud-serverless/api/#operation/PostWrite).

Also tracked here:
influxdata/influxdb-observability#262
  • Loading branch information
jacobmarble committed Jul 10, 2023
1 parent 4874893 commit 4e3c466
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 21 deletions.
20 changes: 20 additions & 0 deletions .chloggen/jgm-influxdbexporter-payload-limits.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: influxdbexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: limit size of write payload

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24001]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ require (
github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 // indirect
github.com/influxdata/influxdb-observability/common v0.5.2 // indirect
github.com/influxdata/influxdb-observability/influx2otel v0.5.2 // indirect
github.com/influxdata/influxdb-observability/otel2influx v0.5.2 // indirect
github.com/influxdata/influxdb-observability/otel2influx v0.5.3-0.20230705233630-3cfabd1d00b9 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/ionos-cloud/sdk-go/v6 v6.1.4 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ require (
github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 // indirect
github.com/influxdata/influxdb-observability/common v0.5.2 // indirect
github.com/influxdata/influxdb-observability/influx2otel v0.5.2 // indirect
github.com/influxdata/influxdb-observability/otel2influx v0.5.2 // indirect
github.com/influxdata/influxdb-observability/otel2influx v0.5.3-0.20230705233630-3cfabd1d00b9 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/ionos-cloud/sdk-go/v6 v6.1.4 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions exporter/influxdbexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ The following configuration options are supported:
* `username` (optional) Basic auth username for authenticating with InfluxDB v1.x
* `password` (optional) Basic auth password for authenticating with InfluxDB v1.x
* `span_dimensions` (default = service.name, span.name) Span attributes to use as dimensions (InfluxDB tags)
* `payload_max_lines` (default = 10_000) Maximum number of lines allowed per HTTP POST request
* `payload_max_bytes` (default = 10_000_000) Maximum number of bytes allowed per HTTP POST request
* `metrics_schema` (default = telegraf-prometheus-v1) The chosen metrics schema to write; must be one of:
* `telegraf-prometheus-v1`
* `telegraf-prometheus-v2`
Expand Down
5 changes: 5 additions & 0 deletions exporter/influxdbexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ type Config struct {
// - telegraf-prometheus-v1
// - telegraf-prometheus-v2
MetricsSchema string `mapstructure:"metrics_schema"`

// PayloadMaxLines is the maximum number of line protocol lines to POST in a single request.
PayloadMaxLines int `mapstructure:"payload_max_lines"`
// PayloadMaxBytes is the maximum number of line protocol bytes to POST in a single request.
PayloadMaxBytes int `mapstructure:"payload_max_bytes"`
}

func (cfg *Config) Validate() error {
Expand Down
12 changes: 7 additions & 5 deletions exporter/influxdbexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ func TestLoadConfig(t *testing.T) {
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
Org: "my-org",
Bucket: "my-bucket",
Token: "my-token",
SpanDimensions: []string{"service.name", "span.name"},
MetricsSchema: "telegraf-prometheus-v1",
Org: "my-org",
Bucket: "my-bucket",
Token: "my-token",
SpanDimensions: []string{"service.name", "span.name"},
MetricsSchema: "telegraf-prometheus-v1",
PayloadMaxLines: 72,
PayloadMaxBytes: 27,
},
},
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/influxdbexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func createDefaultConfig() component.Config {
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
MetricsSchema: common.MetricsSchemaTelegrafPrometheusV1.String(),
SpanDimensions: otel2influx.DefaultOtelTracesToLineProtocolConfig().SpanDimensions,
// defaults per suggested:
// https://docs.influxdata.com/influxdb/cloud-serverless/write-data/best-practices/optimize-writes/#batch-writes
PayloadMaxLines: 10_000,
PayloadMaxBytes: 10_000_000,
}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/influxdbexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/influxdata/influxdb-observability/common v0.5.2
github.com/influxdata/influxdb-observability/otel2influx v0.5.2
github.com/influxdata/influxdb-observability/otel2influx v0.5.3-0.20230705233630-3cfabd1d00b9
github.com/influxdata/line-protocol/v2 v2.2.1
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.81.0
Expand Down
4 changes: 2 additions & 2 deletions exporter/influxdbexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions exporter/influxdbexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ influxdb/override-config:
span_dimensions:
- service.name
- span.name
payload_max_lines: 72
payload_max_bytes: 27
30 changes: 26 additions & 4 deletions exporter/influxdbexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type influxHTTPWriter struct {
httpClientSettings confighttp.HTTPClientSettings
telemetrySettings component.TelemetrySettings
writeURL string
payloadMaxLines int
payloadMaxBytes int

logger common.Logger
}
Expand All @@ -55,6 +57,8 @@ func newInfluxHTTPWriter(logger common.Logger, config *Config, telemetrySettings
httpClientSettings: config.HTTPClientSettings,
telemetrySettings: telemetrySettings,
writeURL: writeURL,
payloadMaxLines: config.PayloadMaxLines,
payloadMaxBytes: config.PayloadMaxBytes,
logger: logger,
}, nil
}
Expand Down Expand Up @@ -120,20 +124,24 @@ var _ otel2influx.InfluxWriterBatch = (*influxHTTPWriterBatch)(nil)

type influxHTTPWriterBatch struct {
*influxHTTPWriter
encoder *lineprotocol.Encoder
encoder *lineprotocol.Encoder
payloadLines int
}

func newInfluxHTTPWriterBatch(w *influxHTTPWriter) *influxHTTPWriterBatch {
return &influxHTTPWriterBatch{
influxHTTPWriter: w,
encoder: w.encoderPool.Get().(*lineprotocol.Encoder),
}
}

// EnqueuePoint emits a set of line protocol attributes (metrics, tags, fields, timestamp)
// to the internal line protocol buffer.
// Errors are always "permanent".
func (b *influxHTTPWriterBatch) EnqueuePoint(measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time, _ common.InfluxMetricValueType) error {
// If the buffer is full, it will be flushed by calling WriteBatch.
func (b *influxHTTPWriterBatch) EnqueuePoint(ctx context.Context, measurement string, tags map[string]string, fields map[string]interface{}, ts time.Time, _ common.InfluxMetricValueType) error {
if b.encoder == nil {
b.encoder = b.encoderPool.Get().(*lineprotocol.Encoder)
}

b.encoder.StartLine(measurement)
for _, tag := range b.optimizeTags(tags) {
b.encoder.AddTag(tag.k, tag.v)
Expand All @@ -146,19 +154,33 @@ func (b *influxHTTPWriterBatch) EnqueuePoint(measurement string, tags map[string
if err := b.encoder.Err(); err != nil {
b.encoder.Reset()
b.encoder.ClearErr()
b.encoderPool.Put(b.encoder)
b.encoder = nil
return consumererror.NewPermanent(fmt.Errorf("failed to encode point: %w", err))
}

b.payloadLines++
if b.payloadLines >= b.payloadMaxLines || len(b.encoder.Bytes()) >= b.payloadMaxBytes {
if err := b.WriteBatch(ctx); err != nil {
return err
}
}

return nil
}

// WriteBatch sends the internal line protocol buffer to InfluxDB.
func (b *influxHTTPWriterBatch) WriteBatch(ctx context.Context) error {
if b.encoder == nil {
return nil
}

defer func() {
b.encoder.Reset()
b.encoder.ClearErr()
b.encoderPool.Put(b.encoder)
b.encoder = nil
b.payloadLines = 0
}()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.writeURL, bytes.NewReader(b.encoder.Bytes()))
Expand Down
75 changes: 75 additions & 0 deletions exporter/influxdbexporter/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
package influxdbexporter

import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/influxdata/influxdb-observability/common"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_influxHTTPWriterBatch_optimizeTags(t *testing.T) {
Expand Down Expand Up @@ -66,3 +73,71 @@ func Test_influxHTTPWriterBatch_optimizeTags(t *testing.T) {
})
}
}

func Test_influxHTTPWriterBatch_maxPayload(t *testing.T) {
for _, testCase := range []struct {
name string
payloadMaxLines int
payloadMaxBytes int

expectMultipleRequests bool
}{{
name: "default",
payloadMaxLines: 10_000,
payloadMaxBytes: 10_000_000,

expectMultipleRequests: false,
}, {
name: "limit-lines",
payloadMaxLines: 1,
payloadMaxBytes: 10_000_000,

expectMultipleRequests: true,
}, {
name: "limit-bytes",
payloadMaxLines: 10_000,
payloadMaxBytes: 1,

expectMultipleRequests: true,
}} {
t.Run(testCase.name, func(t *testing.T) {
var httpRequests []*http.Request

mockHTTPService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httpRequests = append(httpRequests, r)
}))
t.Cleanup(mockHTTPService.Close)

batch := &influxHTTPWriterBatch{
influxHTTPWriter: &influxHTTPWriter{
encoderPool: sync.Pool{
New: func() interface{} {
e := new(lineprotocol.Encoder)
e.SetLax(false)
e.SetPrecision(lineprotocol.Nanosecond)
return e
},
},
httpClient: &http.Client{},
writeURL: mockHTTPService.URL,
payloadMaxLines: testCase.payloadMaxLines,
payloadMaxBytes: testCase.payloadMaxBytes,
logger: common.NoopLogger{},
},
}

err := batch.EnqueuePoint(context.Background(), "m", map[string]string{"k": "v"}, map[string]interface{}{"f": int64(1)}, time.Unix(1, 0), 0)
require.NoError(t, err)
err = batch.EnqueuePoint(context.Background(), "m", map[string]string{"k": "v"}, map[string]interface{}{"f": int64(2)}, time.Unix(2, 0), 0)
require.NoError(t, err)
err = batch.WriteBatch(context.Background())
require.NoError(t, err)

if testCase.expectMultipleRequests {
assert.Equal(t, 2, len(httpRequests))
} else {
assert.Equal(t, 1, len(httpRequests))
}
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ require (
github.com/influxdata/go-syslog/v3 v3.0.1-0.20210608084020-ac565dc76ba6 // indirect
github.com/influxdata/influxdb-observability/common v0.5.2 // indirect
github.com/influxdata/influxdb-observability/influx2otel v0.5.2 // indirect
github.com/influxdata/influxdb-observability/otel2influx v0.5.2 // indirect
github.com/influxdata/influxdb-observability/otel2influx v0.5.3-0.20230705233630-3cfabd1d00b9 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/ionos-cloud/sdk-go/v6 v6.1.4 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4e3c466

Please sign in to comment.