forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metricsdata_to_adx.go
291 lines (276 loc) · 10.3 KB
/
metricsdata_to_adx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package azuredataexplorerexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuredataexplorerexporter"
import (
"context"
"fmt"
"math"
"os"
"strconv"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
/*
A converter package that converts and marshals data to be written to ADX metrics tables
*/
const (
hostkey = "host.name"
// Indicates the sum that is used in both summary and in histogram
sumsuffix = "sum"
// Count used in summary , histogram and also in exponential histogram
countsuffix = "count"
// Indicates the sum that is used in both summary and in histogram
sumdescription = "(Sum total of samples)"
// Count used in summary , histogram and also in exponential histogram
countdescription = "(Count of samples)"
)
// This is derived from the specification https://opentelemetry.io/docs/reference/specification/metrics/datamodel/
type AdxMetric struct {
Timestamp string // The timestamp of the occurrence. A metric is measured at a point of time. Formatted into string as RFC3339Nano
// Including name, the Metric object is defined by the following properties:
MetricName string // Name of the metric field
MetricType string // The data point type (e.g. Sum, Gauge, Histogram ExponentialHistogram, Summary)
MetricUnit string // The metric stream’s unit
MetricDescription string // The metric stream’s description
MetricValue float64 // the value of the metric
MetricAttributes map[string]any // JSON attributes that can then be parsed. Extrinsic properties
// Additional properties
Host string // The hostname for analysis of the metric. Extracted from https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/host/
ResourceAttributes map[string]any // The originating Resource attributes. Refer https://opentelemetry.io/docs/reference/specification/resource/sdk/
}
/*
Convert the pMetric to the type ADXMetric , this matches the scheme in the OTELMetric table in the database
*/
func mapToAdxMetric(res pcommon.Resource, md pmetric.Metric, scopeattrs map[string]any, logger *zap.Logger) []*AdxMetric {
logger.Debug("Entering processing of toAdxMetric function")
// default to collectors host name. Ignore the error here. This should not cause the failure of the process
host, err := os.Hostname()
if err != nil {
logger.Warn("Default collector hostname could not be retrieved", zap.Error(err))
}
resourceAttrs := res.Attributes().AsRaw()
if h := resourceAttrs[hostkey]; h != nil {
host = h.(string)
}
createMetric := func(times time.Time, attr pcommon.Map, value func() float64, name string, desc string, mt pmetric.MetricType) *AdxMetric {
clonedScopedAttributes := copyMap(cloneMap(scopeattrs), attr.AsRaw())
if isEmpty(name) {
name = md.Name()
}
if isEmpty(desc) {
desc = md.Description()
}
return &AdxMetric{
Timestamp: times.Format(time.RFC3339Nano),
MetricName: name,
MetricType: mt.String(),
MetricUnit: md.Unit(),
MetricDescription: desc,
MetricValue: value(),
MetricAttributes: clonedScopedAttributes,
Host: host,
ResourceAttributes: resourceAttrs,
}
}
//exhaustive:enforce
switch md.Type() {
case pmetric.MetricTypeGauge:
dataPoints := md.Gauge().DataPoints()
adxMetrics := make([]*AdxMetric, dataPoints.Len())
for gi := 0; gi < dataPoints.Len(); gi++ {
dataPoint := dataPoints.At(gi)
adxMetrics[gi] = createMetric(dataPoint.Timestamp().AsTime(), dataPoint.Attributes(), func() float64 {
var metricValue float64
switch dataPoint.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
metricValue = float64(dataPoint.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
metricValue = dataPoint.DoubleValue()
}
return metricValue
}, "", "", pmetric.MetricTypeGauge)
}
return adxMetrics
case pmetric.MetricTypeHistogram:
dataPoints := md.Histogram().DataPoints()
var adxMetrics []*AdxMetric
for gi := 0; gi < dataPoints.Len(); gi++ {
dataPoint := dataPoints.At(gi)
bounds := dataPoint.ExplicitBounds()
counts := dataPoint.BucketCounts()
// first, add one event for sum, and one for count
{
adxMetrics = append(adxMetrics,
createMetric(dataPoint.Timestamp().AsTime(), dataPoint.Attributes(), dataPoint.Sum,
fmt.Sprintf("%s_%s", md.Name(), sumsuffix),
fmt.Sprintf("%s%s", md.Description(), sumdescription),
pmetric.MetricTypeHistogram))
}
{
adxMetrics = append(adxMetrics,
createMetric(dataPoint.Timestamp().AsTime(), dataPoint.Attributes(), func() float64 {
// Change int to float. The value is a float64 in the table
return float64(dataPoint.Count())
},
fmt.Sprintf("%s_%s", md.Name(), countsuffix),
fmt.Sprintf("%s%s", md.Description(), countdescription),
pmetric.MetricTypeHistogram))
}
// Spec says counts is optional but if present it must have one more
// element than the bounds array.
if counts.Len() == 0 || counts.Len() != bounds.Len()+1 {
continue
}
value := uint64(0)
// now create buckets for each bound.
for bi := 0; bi < bounds.Len(); bi++ {
customMap :=
copyMap(map[string]any{"le": float64ToDimValue(bounds.At(bi))}, dataPoint.Attributes().AsRaw())
value += counts.At(bi)
vMap := pcommon.NewMap()
//nolint:errcheck
vMap.FromRaw(customMap)
adxMetrics = append(adxMetrics, createMetric(dataPoint.Timestamp().AsTime(), vMap, func() float64 {
// Change int to float. The value is a float64 in the table
return float64(value)
},
fmt.Sprintf("%s_bucket", md.Name()),
"",
pmetric.MetricTypeHistogram))
}
// add an upper bound for +Inf
{
// Add the LE field for the bucket's bound
customMap :=
copyMap(map[string]any{
"le": float64ToDimValue(math.Inf(1)),
}, dataPoint.Attributes().AsRaw())
vMap := pcommon.NewMap()
//nolint:errcheck
vMap.FromRaw(customMap)
adxMetrics = append(adxMetrics, createMetric(dataPoint.Timestamp().AsTime(), vMap, func() float64 {
// Change int to float. The value is a float64 in the table
return float64(value + counts.At(counts.Len()-1))
},
fmt.Sprintf("%s_bucket", md.Name()),
"",
pmetric.MetricTypeHistogram))
}
}
return adxMetrics
case pmetric.MetricTypeSum:
dataPoints := md.Sum().DataPoints()
adxMetrics := make([]*AdxMetric, dataPoints.Len())
for gi := 0; gi < dataPoints.Len(); gi++ {
dataPoint := dataPoints.At(gi)
adxMetrics[gi] = createMetric(dataPoint.Timestamp().AsTime(), dataPoint.Attributes(), func() float64 {
var metricValue float64
switch dataPoint.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
metricValue = float64(dataPoint.IntValue())
case pmetric.NumberDataPointValueTypeDouble:
metricValue = dataPoint.DoubleValue()
}
return metricValue
}, "", "", pmetric.MetricTypeSum)
}
return adxMetrics
case pmetric.MetricTypeSummary:
dataPoints := md.Summary().DataPoints()
var adxMetrics []*AdxMetric
for gi := 0; gi < dataPoints.Len(); gi++ {
dataPoint := dataPoints.At(gi)
// first, add one event for sum, and one for count
{
adxMetrics = append(adxMetrics, createMetric(dataPoint.Timestamp().AsTime(), dataPoint.Attributes(), dataPoint.Sum,
fmt.Sprintf("%s_%s", md.Name(), sumsuffix),
fmt.Sprintf("%s%s", md.Description(), sumdescription),
pmetric.MetricTypeSummary))
}
// counts
{
adxMetrics = append(adxMetrics, createMetric(dataPoint.Timestamp().AsTime(),
dataPoint.Attributes(),
func() float64 {
return float64(dataPoint.Count())
},
fmt.Sprintf("%s_%s", md.Name(), countsuffix),
fmt.Sprintf("%s%s", md.Description(), countdescription),
pmetric.MetricTypeSummary))
}
// now create values for each quantile.
for bi := 0; bi < dataPoint.QuantileValues().Len(); bi++ {
dp := dataPoint.QuantileValues().At(bi)
quantileName := fmt.Sprintf("%s_%s", md.Name(), strconv.FormatFloat(dp.Quantile(), 'f', -1, 64))
metricQuantile := map[string]any{
"qt": float64ToDimValue(dp.Quantile()),
quantileName: sanitizeFloat(dp.Value()).(float64),
}
customMap := copyMap(metricQuantile, dataPoint.Attributes().AsRaw())
vMap := pcommon.NewMap()
//nolint:errcheck
vMap.FromRaw(customMap)
adxMetrics = append(adxMetrics, createMetric(dataPoint.Timestamp().AsTime(),
vMap,
dp.Value,
quantileName,
fmt.Sprintf("%s%s", md.Description(), countdescription),
pmetric.MetricTypeSummary))
}
}
return adxMetrics
case pmetric.MetricTypeExponentialHistogram, pmetric.MetricTypeEmpty:
fallthrough
default:
logger.Warn(
"Unsupported metric type : ",
zap.Any("metric", md))
return nil
}
}
// Given all the metrics , transform that to the representative structure
func rawMetricsToAdxMetrics(_ context.Context, metrics pmetric.Metrics, logger *zap.Logger) []*AdxMetric {
var transformedAdxMetrics []*AdxMetric
resourceMetric := metrics.ResourceMetrics()
for i := 0; i < resourceMetric.Len(); i++ {
res := resourceMetric.At(i).Resource()
scopeMetrics := resourceMetric.At(i).ScopeMetrics()
for j := 0; j < scopeMetrics.Len(); j++ {
scopeMetric := scopeMetrics.At(j)
metrics := scopeMetric.Metrics()
// get details of the scope from the scope metric
scopeAttr := getScopeMap(scopeMetric.Scope())
for k := 0; k < metrics.Len(); k++ {
transformedAdxMetrics = append(transformedAdxMetrics, mapToAdxMetric(res, metrics.At(k), scopeAttr, logger)...)
}
}
}
return transformedAdxMetrics
}
func copyMap(toAttrib map[string]any, fromAttrib map[string]any) map[string]any {
for k, v := range fromAttrib {
toAttrib[k] = v
}
return toAttrib
}
func cloneMap(fields map[string]any) map[string]any {
newFields := make(map[string]any, len(fields))
return copyMap(newFields, fields)
}
func float64ToDimValue(f float64) string {
return strconv.FormatFloat(f, 'g', -1, 64)
}
func sanitizeFloat(value float64) any {
if math.IsNaN(value) {
return math.NaN()
}
if math.IsInf(value, 1) {
return math.Inf(1)
}
if math.IsInf(value, -1) {
return math.Inf(-1)
}
return value
}