Skip to content

Commit

Permalink
[sqlqueryreceiver] - add timestamp to metrics (open-telemetry#12088)
Browse files Browse the repository at this point in the history
Make sure metrics have timestamps. Prometheus refuses metrics whose timestamps exceed x amount of minutes and the default timestamp is yea 1970 so metrics get dropped

Co-authored-by: Dmitrii Anoshin <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
3 people authored Jul 12, 2022
1 parent 87a7ce7 commit 6b276a8
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
19 changes: 18 additions & 1 deletion receiver/sqlqueryreceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ import (
"fmt"
"strconv"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

func rowToMetric(row metricRow, cfg MetricCfg, dest pmetric.Metric) error {
func rowToMetric(row metricRow, cfg MetricCfg, dest pmetric.Metric, startTime pcommon.Timestamp, ts pcommon.Timestamp, scrapeCfg scraperhelper.ScraperControllerSettings) error {
dest.SetName(cfg.MetricName)
dest.SetDescription(cfg.Description)
dest.SetUnit(cfg.Unit)
dataPointSlice := setMetricFields(cfg, dest)
dataPoint := dataPointSlice.AppendEmpty()
setTimestamp(cfg, dataPoint, startTime, ts, scrapeCfg)
value, found := row[cfg.ValueColumn]
if !found {
return fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn)
Expand All @@ -46,6 +49,20 @@ func rowToMetric(row metricRow, cfg MetricCfg, dest pmetric.Metric) error {
return nil
}

func setTimestamp(cfg MetricCfg, dp pmetric.NumberDataPoint, startTime pcommon.Timestamp, ts pcommon.Timestamp, scrapeCfg scraperhelper.ScraperControllerSettings) {
dp.SetTimestamp(ts)

// Cumulative sum should have a start time set to the beginning of the data points cumulation
if cfg.Aggregation == MetricAggregationCumulative && cfg.DataType != MetricDataTypeGauge {
dp.SetStartTimestamp(startTime)
}

// Non-cumulative sum should have a start time set to the previous endpoint
if cfg.Aggregation == MetricAggregationDelta && cfg.DataType != MetricDataTypeGauge {
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(ts.AsTime().Add(-scrapeCfg.CollectionInterval)))
}
}

func setMetricFields(cfg MetricCfg, dest pmetric.Metric) pmetric.NumberDataPointSlice {
var out pmetric.NumberDataPointSlice
switch cfg.DataType {
Expand Down
7 changes: 4 additions & 3 deletions receiver/sqlqueryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ func createReceiverFunc(sqlOpenerFunc sqlOpenerFunc, clientProviderFunc clientPr
for i, query := range sqlCfg.Queries {
id := config.NewComponentIDWithName("sqlqueryreceiver", fmt.Sprintf("query-%d: %s", i, query.SQL))
mp := &scraper{
id: id,
query: query,
logger: settings.TelemetrySettings.Logger,
id: id,
query: query,
scrapeCfg: sqlCfg.ScraperControllerSettings,
logger: settings.TelemetrySettings.Logger,
dbProviderFunc: func() (*sql.DB, error) {
return sqlOpenerFunc(sqlCfg.Driver, sqlCfg.DataSource)
},
Expand Down
9 changes: 8 additions & 1 deletion receiver/sqlqueryreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"database/sql"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/multierr"
Expand All @@ -30,6 +32,8 @@ import (
type scraper struct {
id config.ComponentID
query Query
scrapeCfg scraperhelper.ScraperControllerSettings
startTime pcommon.Timestamp
clientProviderFunc clientProviderFunc
dbProviderFunc dbProviderFunc
logger *zap.Logger
Expand All @@ -50,12 +54,15 @@ func (s *scraper) Start(context.Context, component.Host) error {
return fmt.Errorf("failed to open db connection: %w", err)
}
s.client = s.clientProviderFunc(s.db, s.query.SQL, s.logger)
s.startTime = pcommon.NewTimestampFromTime(time.Now())

return nil
}

func (s scraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
out := pmetric.NewMetrics()
rows, err := s.client.metricRows(ctx)
ts := pcommon.NewTimestampFromTime(time.Now())
if err != nil {
return out, fmt.Errorf("scraper: %w", err)
}
Expand All @@ -67,7 +74,7 @@ func (s scraper) Scrape(ctx context.Context) (pmetric.Metrics, error) {
var errs error
for _, metricCfg := range s.query.Metrics {
for i, row := range rows {
if err = rowToMetric(row, metricCfg, ms.AppendEmpty()); err != nil {
if err = rowToMetric(row, metricCfg, ms.AppendEmpty(), s.startTime, ts, s.scrapeCfg); err != nil {
err = fmt.Errorf("row %d: %w", i, err)
errs = multierr.Append(errs, err)
}
Expand Down
11 changes: 11 additions & 0 deletions unreleased/sqlqr-ts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change
note: Metrics did not contain a timestamp. Timestamps are required for a variety of backends, especially prometheus. In order to work with prometheus we must set the metrics timestamp because metrics with the default timestamp (year 1970) will be dropped by prometheus backends.

# One or more tracking issues related to the change
issues: [12088]

0 comments on commit 6b276a8

Please sign in to comment.