Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] prometheusremotewrite exporter add option to send metadata #27565

Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
78b1d28
feat: prometheus remote write send metadata
Sep 30, 2023
14d0c3b
chore: added config option for sending medatada
Oct 9, 2023
54fc699
chore: send metadata in separate requests
Oct 9, 2023
3343077
chore: ran make goporto to make the pipeline pass
Oct 28, 2023
0d27a44
chore: add missing license to the file
Oct 28, 2023
44c22dd
chore: more fixes for pipeline failing
Oct 29, 2023
afae26f
chore: more fixes for pipeline failing
Oct 29, 2023
0708868
Merge branch 'main' into jm-feat-prometheus-remote-write-send-metadata-2
Oct 30, 2023
9e2d10d
Merge branch 'main' into jm-feat-prometheus-remote-write-send-metadata-2
Nov 9, 2023
4663b68
chore: disable sending metadata by default
Nov 9, 2023
dfa93e0
chore: update remote write exporter readme
Nov 9, 2023
2c724f9
chore: added unit tests
Nov 9, 2023
c3c4d02
chore: added changelog entry
Nov 9, 2023
62588b8
chore: make linter happy
Nov 9, 2023
b0e946c
Merge branch 'main' into jm-feat-prometheus-remote-write-send-metadata-2
Nov 10, 2023
a019771
Merge branch 'main' into jm-feat-prometheus-remote-write-send-metadata-2
Nov 11, 2023
41c68e7
Update exporter/prometheusremotewriteexporter/README.md
jmichalek132 Nov 15, 2023
ec03854
chore: update comment copied comment
Nov 15, 2023
7ae3f82
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 19, 2023
c47cea6
chore: add separate method for creating metadata requests
Nov 19, 2023
a05570c
chore: fix metadata length calculation
Nov 19, 2023
90c79d7
chore: fix pass metadata requests
Nov 19, 2023
cc196be
Merge branch 'main' into jm-feat-prometheus-remote-write-send-metadata-2
Nov 19, 2023
9f7f769
chore: update call of a function
Nov 19, 2023
cfaa046
chore: don't set unit in metadata for now
Nov 20, 2023
6f07237
Merge branch 'main' into jm-feat-prometheus-remote-write-send-metadata-2
Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: prometheusremotewrite exporter add option to send metadata

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [ 13849 ]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The following settings can be optionally configured:
- *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.*
- `namespace`: prefix attached to each exported metric name.
- `add_metric_suffixes`: If set to false, type and unit suffixes will not be added to metrics. Default: true.
- `send_metadata`: If set to true, prometheus metadata will generated and sent. Default: false.
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `enabled`: enable the sending queue (default: `true`)
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`)
Expand Down
3 changes: 3 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Config struct {

// AddMetricSuffixes controls whether unit and type suffixes are added to metrics on export
AddMetricSuffixes bool `mapstructure:"add_metric_suffixes"`

// AddMetricSuffixes controls whether unit and type suffixes are added to metrics on export
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
SendMetadata bool `mapstructure:"send_metadata"`
}

type CreatedMetric struct {
Expand Down
14 changes: 11 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err
DisableTargetInfo: !cfg.TargetInfo.Enabled,
ExportCreatedMetric: cfg.CreatedMetric.Enabled,
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
},
}
if cfg.WAL == nil {
Expand Down Expand Up @@ -130,12 +131,19 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
err = consumererror.NewPermanent(err)
}

if prwe.exporterSettings.SendMetadata {
m := prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
}
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.handleExport(ctx, tsMap))
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, nil))
}
}

Expand All @@ -151,14 +159,14 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
return sanitizedLabels, nil
}

func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
return nil
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error {
return err
}

return prwe.handleExport(context.Background(), testmap)
return prwe.handleExport(context.Background(), testmap, nil)
Aneurysm9 marked this conversation as resolved.
Show resolved Hide resolved
}

// Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as
Expand Down Expand Up @@ -919,7 +919,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) {
"timeseries1": ts1,
"timeseries2": ts2,
}
errs := prwe.handleExport(ctx, tsMap)
errs := prwe.handleExport(ctx, tsMap, nil)
assert.NoError(t, errs)
// Shutdown after we've written to the WAL. This ensures that our
// exported data in-flight will flushed flushed to the WAL before exiting.
Expand Down
1 change: 1 addition & 0 deletions exporter/prometheusremotewriteexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func createDefaultConfig() component.Config {
Multiplier: backoff.DefaultMultiplier,
},
AddMetricSuffixes: true,
SendMetadata: false,
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: "https://some.url:9411/api/prom/push",
// We almost read 0 bytes, so no need to tune ReadBufferSize.
Expand Down
56 changes: 51 additions & 5 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) ([]*prompb.WriteRequest, error) {
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

requests := make([]*prompb.WriteRequest, 0, len(tsMap))
requests := make([]*prompb.WriteRequest, 0, len(tsMap)+len(m))
tsArray := make([]prompb.TimeSeries, 0, len(tsMap))
sizeOfCurrentBatch := 0

Expand All @@ -25,7 +25,7 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int)
sizeOfSeries := v.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
wrapped := convertTimeseriesToRequest(tsArray)
wrapped := convertTimeseriesToRequest(tsArray, nil)
requests = append(requests, wrapped)

tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i)
Expand All @@ -38,16 +38,62 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int)
}

if len(tsArray) != 0 {
wrapped := convertTimeseriesToRequest(tsArray)
wrapped := convertTimeseriesToRequest(tsArray, nil)
requests = append(requests, wrapped)
}

mArray := make([]prompb.MetricMetadata, 0, len(m))
sizeOfCurrentBatch = 0
i = 0
for _, v := range m {
sizeOfM := v.Size()

if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
wrapped := convertTimeseriesToRequest(nil, mArray)
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
requests = append(requests, wrapped)

mArray = make([]prompb.MetricMetadata, 0, len(m)-i)
sizeOfCurrentBatch = 0
}

mArray = append(mArray, *v)
sizeOfCurrentBatch += sizeOfM
i++
}

if len(mArray) != 0 {
wrapped := convertTimeseriesToRequest(nil, mArray)
requests = append(requests, wrapped)
}

return requests, nil
}

func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteRequest {
func convertTimeseriesToRequest(tsArray []prompb.TimeSeries, m []prompb.MetricMetadata) *prompb.WriteRequest {
// the remote_write endpoint only requires the timeseries.
// otlp defines it's own way to handle metric metadata

if m != nil && tsArray != nil {
return &prompb.WriteRequest{
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
// See:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
Timeseries: orderBySampleTimestamp(tsArray),
Metadata: m,
}
}

if m != nil {
return &prompb.WriteRequest{
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
// See:
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
Metadata: m,
}
}

return &prompb.WriteRequest{
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
// See:
Expand Down
4 changes: 2 additions & 2 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Test_batchTimeSeries(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize)
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil)
if tt.returnErr {
assert.Error(t, err)
return
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestEnsureTimeseriesPointsAreSortedByTimestamp(t *testing.T) {
},
},
}
got := convertTimeseriesToRequest(outOfOrder)
got := convertTimeseriesToRequest(outOfOrder, nil)
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved

// We must ensure that the resulting Timeseries' sample points are sorted by Timestamp.
want := &prompb.WriteRequest{
Expand Down
1 change: 1 addition & 0 deletions pkg/translator/prometheusremotewrite/metrics_to_prw.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Settings struct {
DisableTargetInfo bool
ExportCreatedMetric bool
AddMetricSuffixes bool
SendMetadata bool
}

// FromMetrics converts pmetric.Metrics to prometheus remote write format.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pmetric"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
)

func otelMetricTypeToPromMetricType(otelMetric pmetric.Metric) prompb.MetricMetadata_MetricType {
switch otelMetric.Type() {
case pmetric.MetricTypeGauge:
return prompb.MetricMetadata_GAUGE
case pmetric.MetricTypeSum:
metricType := prompb.MetricMetadata_GAUGE
if otelMetric.Sum().IsMonotonic() {
metricType = prompb.MetricMetadata_COUNTER
}
return metricType
case pmetric.MetricTypeHistogram:
return prompb.MetricMetadata_HISTOGRAM
case pmetric.MetricTypeSummary:
return prompb.MetricMetadata_SUMMARY
case pmetric.MetricTypeExponentialHistogram:
return prompb.MetricMetadata_HISTOGRAM
}
return prompb.MetricMetadata_UNKNOWN
}

func OtelMetricsToMetadata(md pmetric.Metrics, addMetricSuffixes bool) []*prompb.MetricMetadata {
resourceMetricsSlice := md.ResourceMetrics()

metadataLength := 0
for i := 0; i < resourceMetricsSlice.Len(); i++ {
metadataLength += resourceMetricsSlice.At(i).ScopeMetrics().Len()
Aneurysm9 marked this conversation as resolved.
Show resolved Hide resolved
}
var metadata = make([]*prompb.MetricMetadata, 0, metadataLength)
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
scopeMetricsSlice := resourceMetrics.ScopeMetrics()

for j := 0; j < scopeMetricsSlice.Len(); j++ {
scopeMetrics := scopeMetricsSlice.At(j)
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)
entry := prompb.MetricMetadata{
Type: otelMetricTypeToPromMetricType(metric),
MetricFamilyName: prometheustranslator.BuildCompliantName(metric, "", addMetricSuffixes),
Help: metric.Description(),
Unit: metric.Unit(),
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
}
metadata = append(metadata, &entry)
}
}
}

return metadata
}
Loading