From 4943c97963be25fe8498bf1ecc8bedc361c6b905 Mon Sep 17 00:00:00 2001 From: Christian Schneider Date: Wed, 6 Dec 2023 10:12:23 +0100 Subject: [PATCH] Add option to enforce strict monotonicy in metrics. This change adds the new option `force_monotonicy` to metric configurations. It is intended for almost-but-not-really monotinic sources, such as counters which reset when the sensor is restarted. When this option is set to `true`, the source metric value is regularly written to disk. This allows us to detect and compensate counter resets even between restarts. When a reset is detected, the last value before the reset becomes the new offset, which is added to the metric value going forth. The result is a strictly monotonic time series, like an ever increasing counter. --- Readme.md | 14 +++++ cmd/mqtt2prometheus.go | 2 +- pkg/config/config.go | 24 +++++++- pkg/metrics/extractor.go | 17 +++++- pkg/metrics/parser.go | 94 +++++++++++++++++++++++++++- pkg/metrics/parser_test.go | 121 +++++++++++++++++++++++++++++++++++-- 6 files changed, 261 insertions(+), 11 deletions(-) diff --git a/Readme.md b/Readme.md index 99f7d09..b79484d 100644 --- a/Readme.md +++ b/Readme.md @@ -182,6 +182,8 @@ cache: # Set the timeout to -1 to disable the deletion of metrics from the cache. The exporter presents the ingest timestamp # to prometheus. timeout: 24h + # Path to the directory to keep the state for monotonic metrics. + state_directory: "/var/lib/mqtt2prometheus" json_parsing: # Separator. Used to split path to elements when accessing json fields. # You can access json fields with dots in it. F.E. {"key.name": {"nested": "value"}} @@ -248,6 +250,18 @@ metrics: # Metric value to use if a match cannot be found in the map above. # If not specified, parsing error will occur. error_value: 1 + # The name of the metric in prometheus + - prom_name: total_energy + # The name of the metric in a MQTT JSON message + mqtt_name: aenergy.total + # Regular expression to only match sensors with the given name pattern + sensor_name_filter: "^shellyplus1pm-.*$" + # The prometheus help text for this metric + help: Total energy used + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: counter + # This setting requires an almost monotonic counter as the source. When monotonicy is enforced, the metric value is regularly written to disk. Thus, resets in the source counter can be detected and corrected by adding an offset as if the reset did not happen. The result is a strict monotonic increasing time series, like an ever growing counter. + force_monotonicy: true ``` diff --git a/cmd/mqtt2prometheus.go b/cmd/mqtt2prometheus.go index 02d1776..6adcb00 100644 --- a/cmd/mqtt2prometheus.go +++ b/cmd/mqtt2prometheus.go @@ -227,7 +227,7 @@ func setupGoKitLogger(l *zap.Logger) log.Logger { } func setupExtractor(cfg config.Config) (metrics.Extractor, error) { - parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator) + parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator, cfg.Cache.StateDir) if cfg.MQTT.ObjectPerTopicConfig != nil { switch cfg.MQTT.ObjectPerTopicConfig.Encoding { case config.EncodingJSON: diff --git a/pkg/config/config.go b/pkg/config/config.go index 376de29..a14b510 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "io/ioutil" + "os" "regexp" "time" @@ -26,7 +27,8 @@ var MQTTConfigDefaults = MQTTConfig{ } var CacheConfigDefaults = CacheConfig{ - Timeout: 2 * time.Minute, + Timeout: 2 * time.Minute, + StateDir: "/var/lib/mqtt2prometheus", } var JsonParsingConfigDefaults = JsonParsingConfig{ @@ -94,7 +96,8 @@ type Config struct { } type CacheConfig struct { - Timeout time.Duration `yaml:"timeout"` + Timeout time.Duration `yaml:"timeout"` + StateDir string `yaml:"state_directory"` } type JsonParsingConfig struct { @@ -135,6 +138,7 @@ type MetricConfig struct { Help string `yaml:"help"` ValueType string `yaml:"type"` OmitTimestamp bool `yaml:"omit_timestamp"` + ForceMonotonicy bool `yaml:"force_monotonicy"` ConstantLabels map[string]string `yaml:"const_labels"` StringValueMapping *StringValueMappingConfig `yaml:"string_value_mapping"` MQTTValueScale float64 `yaml:"mqtt_value_scale"` @@ -179,6 +183,9 @@ func LoadConfig(configFile string) (Config, error) { if cfg.Cache == nil { cfg.Cache = &CacheConfigDefaults } + if cfg.Cache.StateDir == "" { + cfg.Cache.StateDir = CacheConfigDefaults.StateDir + } if cfg.JsonParsing == nil { cfg.JsonParsing = &JsonParsingConfigDefaults } @@ -217,5 +224,18 @@ func LoadConfig(configFile string) (Config, error) { } } + // If any metric forces monotonicy, we need a state directory. + forcesMonotonicy := false + for _, m := range cfg.Metrics { + if m.ForceMonotonicy { + forcesMonotonicy = true + } + } + if forcesMonotonicy { + if err := os.MkdirAll(cfg.Cache.StateDir, 0755); err != nil { + return Config{}, err + } + } + return cfg, nil } diff --git a/pkg/metrics/extractor.go b/pkg/metrics/extractor.go index 0476cd1..ea91e38 100644 --- a/pkg/metrics/extractor.go +++ b/pkg/metrics/extractor.go @@ -2,6 +2,7 @@ package metrics import ( "fmt" + "regexp" "github.com/hikhvar/mqtt2prometheus/pkg/config" gojsonq "github.com/thedevsaddam/gojsonq/v2" @@ -9,6 +10,16 @@ import ( type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error) +// metricID returns a deterministic identifier per metic config which is safe to use in a file path. +func metricID(topic, metric, deviceID, promName string) string { + re := regexp.MustCompile(`[^a-zA-Z0-9]`) + deviceID = re.ReplaceAllString(deviceID, "_") + topic = re.ReplaceAllString(topic, "_") + metric = re.ReplaceAllString(metric, "_") + promName = re.ReplaceAllString(promName, "_") + return fmt.Sprintf("%s-%s-%s-%s", deviceID, topic, metric, promName) +} + func NewJSONObjectExtractor(p Parser) Extractor { return func(topic string, payload []byte, deviceID string) (MetricCollection, error) { var mc MetricCollection @@ -27,7 +38,8 @@ func NewJSONObjectExtractor(p Parser) Extractor { continue } - m, err := p.parseMetric(config, rawValue) + id := metricID(topic, path, deviceID, config.PrometheusName) + m, err := p.parseMetric(config, id, rawValue) if err != nil { return nil, fmt.Errorf("failed to parse valid metric value: %w", err) } @@ -63,7 +75,8 @@ func NewMetricPerTopicExtractor(p Parser, metricNameRegex *config.Regexp) Extrac rawValue = string(payload) } - m, err := p.parseMetric(config, rawValue) + id := metricID(topic, metricName, deviceID, config.PrometheusName) + m, err := p.parseMetric(config, id, rawValue) if err != nil { return nil, fmt.Errorf("failed to parse metric: %w", err) } diff --git a/pkg/metrics/parser.go b/pkg/metrics/parser.go index 2482239..9efbd66 100644 --- a/pkg/metrics/parser.go +++ b/pkg/metrics/parser.go @@ -2,22 +2,44 @@ package metrics import ( "fmt" + "os" "strconv" + "strings" "time" "github.com/hikhvar/mqtt2prometheus/pkg/config" + "gopkg.in/yaml.v2" ) +// monotonicState holds the runtime information to realize a monotonic increasing value. +type monotonicState struct { + // Basline value to add to each parsed metric value to maintain monotonicy + Offset float64 `yaml:"value_offset"` + // Last value that was parsed before the offset was added + LastRawValue float64 `yaml:"last_raw_value"` +} + +// metricState holds runtime information per metric configuration. +type metricState struct { + monotonic monotonicState + // The last time the state file was written + lastWritten time.Time +} + type Parser struct { separator string // Maps the mqtt metric name to a list of configs // The first that matches SensorNameFilter will be used metricConfigs map[string][]config.MetricConfig + // Directory holding state files + stateDir string + // Per-metric state + states map[string]*metricState } var now = time.Now -func NewParser(metrics []config.MetricConfig, separator string) Parser { +func NewParser(metrics []config.MetricConfig, separator, stateDir string) Parser { cfgs := make(map[string][]config.MetricConfig) for i := range metrics { key := metrics[i].MQTTName @@ -26,6 +48,8 @@ func NewParser(metrics []config.MetricConfig, separator string) Parser { return Parser{ separator: separator, metricConfigs: cfgs, + stateDir: strings.TrimRight(stateDir, "/"), + states: make(map[string]*metricState), } } @@ -47,7 +71,7 @@ func (p *Parser) findMetricConfig(metric string, deviceID string) (config.Metric // parseMetric parses the given value according to the given deviceID and metricPath. The config allows to // parse a metric value according to the device ID. -func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric, error) { +func (p *Parser) parseMetric(cfg config.MetricConfig, metricID string, value interface{}) (Metric, error) { var metricValue float64 if boolValue, ok := value.(bool); ok { @@ -87,6 +111,22 @@ func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value) } + if cfg.ForceMonotonicy { + ms, err := p.getMetricState(metricID) + if err != nil { + return Metric{}, err + } + // When the source metric is reset, the last adjusted value becomes the new offset. + if metricValue < ms.monotonic.LastRawValue { + ms.monotonic.Offset += ms.monotonic.LastRawValue + // Trigger flushing the new state to disk. + ms.lastWritten = time.Time{} + } + + ms.monotonic.LastRawValue = metricValue + metricValue += ms.monotonic.Offset + } + if cfg.MQTTValueScale != 0 { metricValue = metricValue * cfg.MQTTValueScale } @@ -103,3 +143,53 @@ func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric IngestTime: ingestTime, }, nil } + +func (p *Parser) stateFileName(metricID string) string { + return fmt.Sprintf("%s/%s.yaml", p.stateDir, metricID) +} + +// readMetricState parses the metric state from the configured path. +// If the file does not exist, an empty state is returned. +func (p *Parser) readMetricState(metricID string) (*metricState, error) { + data, err := os.ReadFile(p.stateFileName(metricID)) + state := &metricState{} + if err != nil { + // The file does not exist for new metrics. + if os.IsNotExist(err) { + return state, nil + } + return state, err + } + err = yaml.UnmarshalStrict(data, &state.monotonic) + state.lastWritten = now() + return state, err +} + +// writeMetricState writes back the metric's current state to the configured path. +func (p *Parser) writeMetricState(metricID string, state *metricState) error { + out, err := yaml.Marshal(state.monotonic) + if err != nil { + return err + } + return os.WriteFile(p.stateFileName(metricID), out, 0644) +} + +// getMetricState returns the state of the given metric. +// The state is read from and written back to disk as needed. +func (p *Parser) getMetricState(metricID string) (*metricState, error) { + var err error + state, found := p.states[metricID] + if !found { + if state, err = p.readMetricState(metricID); err != nil { + return nil, err + } + p.states[metricID] = state + } + // Write the state back to disc every minute. + if now().Sub(state.lastWritten) >= time.Minute { + if err = p.writeMetricState(metricID, state); err == nil { + state.lastWritten = now() + } + } + return state, err +} diff --git a/pkg/metrics/parser_test.go b/pkg/metrics/parser_test.go index d23df61..31d3b72 100644 --- a/pkg/metrics/parser_test.go +++ b/pkg/metrics/parser_test.go @@ -1,6 +1,7 @@ package metrics import ( + "os" "reflect" "testing" "time" @@ -10,6 +11,12 @@ import ( ) func TestParser_parseMetric(t *testing.T) { + stateDir, err := os.MkdirTemp("", "parser_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(stateDir) + now = testNow type fields struct { metricConfigs map[string][]config.MetricConfig @@ -415,12 +422,111 @@ func TestParser_parseMetric(t *testing.T) { }, wantErr: true, }, + { + name: "monotonic gauge, step 1: initial value", + fields: fields{ + map[string][]config.MetricConfig{ + "aenergy.total": []config.MetricConfig{ + { + PrometheusName: "total_energy", + ValueType: "gauge", + OmitTimestamp: true, + ForceMonotonicy: true, + }, + }, + }, + }, + args: args{ + metricPath: "aenergy.total", + deviceID: "shellyplus1pm-foo", + value: 1.0, + }, + want: Metric{ + Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil), + ValueType: prometheus.GaugeValue, + Value: 1.0, + }, + }, + { + name: "monotonic gauge, step 2: monotonic increase does not add offset", + fields: fields{ + map[string][]config.MetricConfig{ + "aenergy.total": []config.MetricConfig{ + { + PrometheusName: "total_energy", + ValueType: "gauge", + OmitTimestamp: true, + ForceMonotonicy: true, + }, + }, + }, + }, + args: args{ + metricPath: "aenergy.total", + deviceID: "shellyplus1pm-foo", + value: 2.0, + }, + want: Metric{ + Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil), + ValueType: prometheus.GaugeValue, + Value: 2.0, + }, + }, + { + name: "monotonic gauge, step 3: raw metric is reset, last value becomes the new offset", + fields: fields{ + map[string][]config.MetricConfig{ + "aenergy.total": []config.MetricConfig{ + { + PrometheusName: "total_energy", + ValueType: "gauge", + OmitTimestamp: true, + ForceMonotonicy: true, + }, + }, + }, + }, + args: args{ + metricPath: "aenergy.total", + deviceID: "shellyplus1pm-foo", + value: 0.0, + }, + want: Metric{ + Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil), + ValueType: prometheus.GaugeValue, + Value: 2.0, + }, + }, + { + name: "monotonic gauge, step 4: monotonic increase with offset", + fields: fields{ + map[string][]config.MetricConfig{ + "aenergy.total": []config.MetricConfig{ + { + PrometheusName: "total_energy", + ValueType: "gauge", + OmitTimestamp: true, + ForceMonotonicy: true, + }, + }, + }, + }, + args: args{ + metricPath: "aenergy.total", + deviceID: "shellyplus1pm-foo", + value: 1.0, + }, + want: Metric{ + Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil), + ValueType: prometheus.GaugeValue, + Value: 3.0, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := &Parser{ - metricConfigs: tt.fields.metricConfigs, - } + p := NewParser(nil, config.JsonParsingConfigDefaults.Separator, stateDir) + p.metricConfigs = tt.fields.metricConfigs // Find a valid metrics config config, found := p.findMetricConfig(tt.args.metricPath, tt.args.deviceID) @@ -431,7 +537,8 @@ func TestParser_parseMetric(t *testing.T) { return } - got, err := p.parseMetric(config, tt.args.value) + id := metricID("", tt.args.metricPath, tt.args.deviceID, config.PrometheusName) + got, err := p.parseMetric(config, id, tt.args.value) if (err != nil) != tt.wantErr { t.Errorf("parseMetric() error = %v, wantErr %v", err, tt.wantErr) return @@ -439,6 +546,12 @@ func TestParser_parseMetric(t *testing.T) { if !reflect.DeepEqual(got, tt.want) { t.Errorf("parseMetric() got = %v, want %v", got, tt.want) } + + if config.ForceMonotonicy { + if err = p.writeMetricState(id, p.states[id]); err != nil { + t.Errorf("failed to write metric state: %v", err) + } + } }) } }