diff --git a/README.md b/README.md index bc049c1106ae8..ea3b106287e48 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@   •   Getting Involved   •   - Getting In Touch + Getting In Touch

diff --git a/exporter/prometheusexporter/collector.go b/exporter/prometheusexporter/collector.go index 7caeb2e2c25aa..02ec3d761db02 100644 --- a/exporter/prometheusexporter/collector.go +++ b/exporter/prometheusexporter/collector.go @@ -219,18 +219,22 @@ func (c *collector) convertDoubleHistogram(metric pmetric.Metric, resourceAttrs arrLen := ip.Exemplars().Len() exemplars := make([]prometheus.Exemplar, arrLen) + for i := 0; i < arrLen; i++ { e := ip.Exemplars().At(i) + exemplarLabels := make(prometheus.Labels, 0) - labels := make(prometheus.Labels, e.FilteredAttributes().Len()) - e.FilteredAttributes().Range(func(k string, v pcommon.Value) bool { - labels[k] = v.AsString() - return true - }) + if !e.TraceID().IsEmpty() { + exemplarLabels["trace_id"] = e.TraceID().HexString() + } + + if !e.SpanID().IsEmpty() { + exemplarLabels["span_id"] = e.SpanID().HexString() + } exemplars[i] = prometheus.Exemplar{ Value: e.DoubleVal(), - Labels: labels, + Labels: exemplarLabels, Timestamp: e.Timestamp().AsTime(), } } diff --git a/exporter/prometheusexporter/collector_test.go b/exporter/prometheusexporter/collector_test.go index 067f82877cb02..881bff2220a98 100644 --- a/exporter/prometheusexporter/collector_test.go +++ b/exporter/prometheusexporter/collector_test.go @@ -15,6 +15,7 @@ package prometheusexporter import ( + "encoding/hex" "strings" "testing" "time" @@ -106,50 +107,35 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) { metric.SetUnit("T") // initialize empty datapoint - hd := metric.Histogram().DataPoints().AppendEmpty() + histogramDataPoint := metric.Histogram().DataPoints().AppendEmpty() bounds := pcommon.NewImmutableFloat64Slice([]float64{5, 25, 90}) - hd.SetExplicitBounds(bounds) + histogramDataPoint.SetExplicitBounds(bounds) bc := pcommon.NewImmutableUInt64Slice([]uint64{2, 35, 70}) - hd.SetBucketCounts(bc) + histogramDataPoint.SetBucketCounts(bc) + + // add test exemplar values to the metric + promExporterExemplars := histogramDataPoint.Exemplars().AppendEmpty() + var tBytes [16]byte + testTraceID, _ := hex.DecodeString("641d68e314a58152cc2581e7663435d1") + copy(tBytes[:], testTraceID) + traceID := pcommon.NewTraceID(tBytes) + promExporterExemplars.SetTraceID(traceID) + + var sBytes [8]byte + testSpanID, _ := hex.DecodeString("7436d6ac76178623") + copy(sBytes[:], testSpanID) + spanID := pcommon.NewSpanID(sBytes) + promExporterExemplars.SetSpanID(spanID) exemplarTs, _ := time.Parse("unix", "Mon Jan _2 15:04:05 MST 2006") - exemplars := []prometheus.Exemplar{ - { - Timestamp: exemplarTs, - Value: 3, - Labels: prometheus.Labels{"test_label_0": "label_value_0"}, - }, - { - Timestamp: exemplarTs, - Value: 50, - Labels: prometheus.Labels{"test_label_1": "label_value_1"}, - }, - { - Timestamp: exemplarTs, - Value: 78, - Labels: prometheus.Labels{"test_label_2": "label_value_2"}, - }, - { - Timestamp: exemplarTs, - Value: 100, - Labels: prometheus.Labels{"test_label_3": "label_value_3"}, - }, - } - - // add each exemplar value to the metric - for _, e := range exemplars { - pde := hd.Exemplars().AppendEmpty() - pde.SetDoubleVal(e.Value) - for k, v := range e.Labels { - pde.FilteredAttributes().UpsertString(k, v) - } - pde.SetTimestamp(pcommon.NewTimestampFromTime(e.Timestamp)) - } + promExporterExemplars.SetTimestamp(pcommon.NewTimestampFromTime(exemplarTs)) + promExporterExemplars.SetDoubleVal(3.0) pMap := pcommon.NewMap() c := collector{ + accumulator: &mockAccumulator{ metrics: []pmetric.Metric{metric}, resourceAttributes: pMap, @@ -166,29 +152,17 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) { buckets := m.GetHistogram().GetBucket() - require.Equal(t, 4, len(buckets)) + require.Equal(t, 3, len(buckets)) require.Equal(t, 3.0, buckets[0].GetExemplar().GetValue()) require.Equal(t, int32(128654848), buckets[0].GetExemplar().GetTimestamp().GetNanos()) - require.Equal(t, 1, len(buckets[0].GetExemplar().GetLabel())) - require.Equal(t, "test_label_0", buckets[0].GetExemplar().GetLabel()[0].GetName()) - require.Equal(t, "label_value_0", buckets[0].GetExemplar().GetLabel()[0].GetValue()) - - require.Equal(t, 0.0, buckets[1].GetExemplar().GetValue()) - require.Equal(t, int32(0), buckets[1].GetExemplar().GetTimestamp().GetNanos()) - require.Equal(t, 0, len(buckets[1].GetExemplar().GetLabel())) - - require.Equal(t, 78.0, buckets[2].GetExemplar().GetValue()) - require.Equal(t, int32(128654848), buckets[2].GetExemplar().GetTimestamp().GetNanos()) - require.Equal(t, 1, len(buckets[2].GetExemplar().GetLabel())) - require.Equal(t, "test_label_2", buckets[2].GetExemplar().GetLabel()[0].GetName()) - require.Equal(t, "label_value_2", buckets[2].GetExemplar().GetLabel()[0].GetValue()) - - require.Equal(t, 100.0, buckets[3].GetExemplar().GetValue()) - require.Equal(t, int32(128654848), buckets[3].GetExemplar().GetTimestamp().GetNanos()) - require.Equal(t, 1, len(buckets[3].GetExemplar().GetLabel())) - require.Equal(t, "test_label_3", buckets[3].GetExemplar().GetLabel()[0].GetName()) - require.Equal(t, "label_value_3", buckets[3].GetExemplar().GetLabel()[0].GetValue()) + require.Equal(t, 2, len(buckets[0].GetExemplar().GetLabel())) + ml := make(map[string]string) + for _, l := range buckets[0].GetExemplar().GetLabel() { + ml[l.GetName()] = l.GetValue() + } + require.Equal(t, "641d68e314a58152cc2581e7663435d1", ml["trace_id"]) + require.Equal(t, "7436d6ac76178623", ml["span_id"]) } // errorCheckCore keeps track of logged errors diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go index 6575cc557664f..b401ee3f36ff1 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs.go @@ -86,7 +86,7 @@ func (d *Detector) Detect(context.Context) (resource pcommon.Resource, schemaURL attr.UpsertString(conventions.AttributeCloudRegion, region) } - // TMDE returns the the cluster short name or ARN, so we need to construct the ARN if necessary + // TMDE returns the cluster short name or ARN, so we need to construct the ARN if necessary attr.UpsertString(conventions.AttributeAWSECSClusterARN, constructClusterArn(tmdeResp.Cluster, region, account)) // The Availability Zone is not available in all Fargate runtimes @@ -109,18 +109,7 @@ func (d *Detector) Detect(context.Context) (resource pcommon.Resource, schemaURL return res, "", err } - logAttributes := [4]string{ - conventions.AttributeAWSLogGroupNames, - conventions.AttributeAWSLogGroupARNs, - conventions.AttributeAWSLogStreamNames, - conventions.AttributeAWSLogStreamARNs, - } - - for i, attribVal := range getValidLogData(tmdeResp.Containers, selfMetaData, account) { - if attribVal.SliceVal().Len() > 0 { - attr.Insert(logAttributes[i], attribVal) - } - } + addValidLogData(tmdeResp.Containers, selfMetaData, account, attr) return res, conventions.SchemaURL, nil } @@ -149,11 +138,12 @@ func parseRegionAndAccount(taskARN string) (region string, account string) { // "init" containers which only run at startup then shutdown (as indicated by the "KnownStatus" attribute), // containers not using AWS Logs, and those without log group metadata to get the final lists of valid log data // See: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html#task-metadata-endpoint-v4-response -func getValidLogData(containers []ecsutil.ContainerMetadata, self *ecsutil.ContainerMetadata, account string) [4]pcommon.Value { - logGroupNames := pcommon.NewValueSlice() - logGroupArns := pcommon.NewValueSlice() - logStreamNames := pcommon.NewValueSlice() - logStreamArns := pcommon.NewValueSlice() +func addValidLogData(containers []ecsutil.ContainerMetadata, self *ecsutil.ContainerMetadata, account string, dest pcommon.Map) { + initialized := false + var logGroupNames pcommon.Slice + var logGroupArns pcommon.Slice + var logStreamNames pcommon.Slice + var logStreamArns pcommon.Slice for _, container := range containers { logData := container.LogOptions @@ -162,15 +152,19 @@ func getValidLogData(containers []ecsutil.ContainerMetadata, self *ecsutil.Conta container.LogDriver == "awslogs" && self.DockerID != container.DockerID && logData != (ecsutil.LogOptions{}) { - - logGroupNames.SliceVal().AppendEmpty().SetStringVal(logData.LogGroup) - logGroupArns.SliceVal().AppendEmpty().SetStringVal(constructLogGroupArn(logData.Region, account, logData.LogGroup)) - logStreamNames.SliceVal().AppendEmpty().SetStringVal(logData.Stream) - logStreamArns.SliceVal().AppendEmpty().SetStringVal(constructLogStreamArn(logData.Region, account, logData.LogGroup, logData.Stream)) + if !initialized { + logGroupNames = dest.UpsertEmptySlice(conventions.AttributeAWSLogGroupNames) + logGroupArns = dest.UpsertEmptySlice(conventions.AttributeAWSLogGroupARNs) + logStreamNames = dest.UpsertEmptySlice(conventions.AttributeAWSLogStreamNames) + logStreamArns = dest.UpsertEmptySlice(conventions.AttributeAWSLogStreamARNs) + initialized = true + } + logGroupNames.AppendEmpty().SetStringVal(logData.LogGroup) + logGroupArns.AppendEmpty().SetStringVal(constructLogGroupArn(logData.Region, account, logData.LogGroup)) + logStreamNames.AppendEmpty().SetStringVal(logData.Stream) + logStreamArns.AppendEmpty().SetStringVal(constructLogStreamArn(logData.Region, account, logData.LogGroup, logData.Stream)) } } - - return [4]pcommon.Value{logGroupNames, logGroupArns, logStreamNames, logStreamArns} } func constructLogGroupArn(region, account, group string) string { diff --git a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go index bf5b44ebdfe38..d90de6a9eaeea 100644 --- a/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go +++ b/processor/resourcedetectionprocessor/internal/aws/ecs/ecs_test.go @@ -91,11 +91,9 @@ func Test_ecsFiltersInvalidContainers(t *testing.T) { containers := []ecsutil.ContainerMetadata{c1, c2, c3, c4} - ld := getValidLogData(containers, &c4, "123") - - for _, attrib := range ld { - assert.Equal(t, 0, attrib.SliceVal().Len()) - } + dest := pcommon.NewMap() + addValidLogData(containers, &c4, "123", dest) + assert.Equal(t, 0, dest.Len()) } func Test_ecsDetectV4(t *testing.T) { diff --git a/processor/resourcedetectionprocessor/internal/env/env.go b/processor/resourcedetectionprocessor/internal/env/env.go index e52bc8fbd62a7..83e45c1151e5b 100644 --- a/processor/resourcedetectionprocessor/internal/env/env.go +++ b/processor/resourcedetectionprocessor/internal/env/env.go @@ -78,7 +78,6 @@ var labelRegex = regexp.MustCompile(`\s*([[:ascii:]]{1,256}?)\s*=\s*([[:ascii:]] func initializeAttributeMap(am pcommon.Map, s string) error { matches := labelRegex.FindAllStringSubmatchIndex(s, -1) - for len(matches) == 0 { return fmt.Errorf("invalid resource format: %q", s) } @@ -97,7 +96,7 @@ func initializeAttributeMap(am pcommon.Map, s string) error { if value, err = url.QueryUnescape(value); err != nil { return fmt.Errorf("invalid resource format in attribute: %q, err: %w", s[match[0]:match[1]], err) } - am.InsertString(key, value) + am.UpsertString(key, value) prevIndex = match[1] } diff --git a/processor/resourcedetectionprocessor/internal/gcp/gcp.go b/processor/resourcedetectionprocessor/internal/gcp/gcp.go index 198ee65f59397..b2406f39e6148 100644 --- a/processor/resourcedetectionprocessor/internal/gcp/gcp.go +++ b/processor/resourcedetectionprocessor/internal/gcp/gcp.go @@ -119,44 +119,49 @@ type resourceBuilder struct { } func (r *resourceBuilder) add(key string, detect func() (string, error)) { - if v, err := detect(); err == nil { - r.attrs.InsertString(key, v) - } else { + v, err := detect() + if err != nil { r.errs = append(r.errs, err) + return } + r.attrs.UpsertString(key, v) } // addFallible adds a detect function whose failures should be ignored func (r *resourceBuilder) addFallible(key string, detect func() (string, error)) { - if v, err := detect(); err == nil { - r.attrs.InsertString(key, v) - } else { + v, err := detect() + if err != nil { r.logger.Info("Fallible detector failed. This attribute will not be available.", zap.String("key", key), zap.Error(err)) + return } + r.attrs.UpsertString(key, v) } // zoneAndRegion functions are expected to return zone, region, err. func (r *resourceBuilder) addZoneAndRegion(detect func() (string, string, error)) { - if zone, region, err := detect(); err == nil { - r.attrs.InsertString(conventions.AttributeCloudAvailabilityZone, zone) - r.attrs.InsertString(conventions.AttributeCloudRegion, region) - } else { + zone, region, err := detect() + if err != nil { r.errs = append(r.errs, err) + return } + r.attrs.UpsertString(conventions.AttributeCloudAvailabilityZone, zone) + r.attrs.UpsertString(conventions.AttributeCloudRegion, region) } func (r *resourceBuilder) addZoneOrRegion(detect func() (string, gcp.LocationType, error)) { - if v, locType, err := detect(); err == nil { - switch locType { - case gcp.Zone: - r.attrs.InsertString(conventions.AttributeCloudAvailabilityZone, v) - case gcp.Region: - r.attrs.InsertString(conventions.AttributeCloudRegion, v) - default: - r.errs = append(r.errs, fmt.Errorf("location must be zone or region. Got %v", locType)) - } - } else { + v, locType, err := detect() + if err != nil { r.errs = append(r.errs, err) + return + } + + switch locType { + case gcp.Zone: + r.attrs.UpsertString(conventions.AttributeCloudAvailabilityZone, v) + case gcp.Region: + r.attrs.UpsertString(conventions.AttributeCloudRegion, v) + default: + r.errs = append(r.errs, fmt.Errorf("location must be zone or region. Got %v", locType)) } } diff --git a/processor/routingprocessor/extract_test.go b/processor/routingprocessor/extract_test.go index a8d992024f32d..df03b66e2b574 100644 --- a/processor/routingprocessor/extract_test.go +++ b/processor/routingprocessor/extract_test.go @@ -93,8 +93,7 @@ func TestExtractorForTraces_FromResourceAttribute(t *testing.T) { tracesFunc: func() ptrace.Traces { traces := ptrace.NewTraces() rSpans := traces.ResourceSpans().AppendEmpty() - rSpans.Resource().Attributes(). - InsertString("k8s.namespace.name", "namespace-1") + rSpans.Resource().Attributes().UpsertString("k8s.namespace.name", "namespace-1") return traces }, fromAttr: "k8s.namespace.name", @@ -105,8 +104,7 @@ func TestExtractorForTraces_FromResourceAttribute(t *testing.T) { tracesFunc: func() ptrace.Traces { traces := ptrace.NewTraces() rSpans := traces.ResourceSpans().AppendEmpty() - rSpans.Resource().Attributes(). - InsertString("k8s.namespace.name", "namespace-1") + rSpans.Resource().Attributes().UpsertString("k8s.namespace.name", "namespace-1") return traces }, fromAttr: "k8s.namespace.name", diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index efa9abb5d845d..e9509ae72c2b5 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -220,7 +220,7 @@ func sampleTraces() ptrace.Traces { clientSpan.SetKind(ptrace.SpanKindClient) clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart)) clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd)) - clientSpan.Attributes().Insert("some-attribute", pcommon.NewValueString("val")) // Attribute selected as dimension for metrics + clientSpan.Attributes().UpsertString("some-attribute", "val") // Attribute selected as dimension for metrics serverSpan := scopeSpans.Spans().AppendEmpty() serverSpan.SetName("server span") diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index c1121448231d5..bf15554c1e5d6 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -42,7 +42,6 @@ const ( spanKindKey = "span.kind" // OpenTelemetry non-standard constant. statusCodeKey = "status.code" // OpenTelemetry non-standard constant. metricKeySeparator = string(byte(0)) - traceIDKey = "trace_id" defaultDimensionsCacheSize = 1000 ) @@ -55,6 +54,7 @@ var ( type exemplarData struct { traceID pcommon.TraceID + spanID pcommon.SpanID value float64 } @@ -392,7 +392,7 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.S p.cache(serviceName, span, key, resourceAttr) p.updateCallMetrics(key) p.updateLatencyMetrics(key, latencyInMilliseconds, index) - p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID()) + p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID(), span.SpanID()) } // updateCallMetrics increments the call count for the given metric key. @@ -411,13 +411,14 @@ func (p *processorImp) resetAccumulatedMetrics() { } // updateLatencyExemplars sets the histogram exemplars for the given metric key and append the exemplar data. -func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID) { +func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID, spanID pcommon.SpanID) { if _, ok := p.latencyExemplarsData[key]; !ok { p.latencyExemplarsData[key] = []exemplarData{} } e := exemplarData{ traceID: traceID, + spanID: spanID, value: value, } p.latencyExemplarsData[key] = append(p.latencyExemplarsData[key], e) @@ -561,16 +562,14 @@ func setLatencyExemplars(exemplarsData []exemplarData, timestamp pcommon.Timesta for _, ed := range exemplarsData { value := ed.value traceID := ed.traceID + spanID := ed.spanID exemplar := es.AppendEmpty() - - if traceID.IsEmpty() { - continue - } - exemplar.SetDoubleVal(value) exemplar.SetTimestamp(timestamp) - exemplar.FilteredAttributes().UpsertString(traceIDKey, traceID.HexString()) + exemplar.SetTraceID(traceID) + exemplar.SetSpanID(spanID) + } es.CopyTo(exemplars) diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 80d80c32e6790..ad75585220ae5 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -631,6 +631,7 @@ func initSpan(span span, s ptrace.Span) { s.Attributes().UpsertEmptyMap(mapAttrName) s.Attributes().UpsertEmptySlice(arrayAttrName) s.SetTraceID(pcommon.NewTraceID([16]byte{byte(42)})) + s.SetSpanID(pcommon.NewSpanID([8]byte{byte(42)})) } func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExporter, component.TracesExporter) { @@ -841,21 +842,23 @@ func TestSetLatencyExemplars(t *testing.T) { // ----- conditions ------------------------------------------------------- traces := buildSampleTrace() traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID() exemplarSlice := pmetric.NewExemplarSlice() timestamp := pcommon.NewTimestampFromTime(time.Now()) value := float64(42) - ed := []exemplarData{{traceID: traceID, value: value}} + ed := []exemplarData{{traceID: traceID, spanID: spanID, value: value}} // ----- call ------------------------------------------------------------- setLatencyExemplars(ed, timestamp, exemplarSlice) // ----- verify ----------------------------------------------------------- - traceIDValue, exist := exemplarSlice.At(0).FilteredAttributes().Get(traceIDKey) + traceIDValue := exemplarSlice.At(0).TraceID() + spanIDValue := exemplarSlice.At(0).SpanID() assert.NotEmpty(t, exemplarSlice) - assert.True(t, exist) - assert.Equal(t, traceIDValue.AsString(), traceID.HexString()) + assert.Equal(t, traceIDValue, traceID) + assert.Equal(t, spanIDValue, spanID) assert.Equal(t, exemplarSlice.At(0).Timestamp(), timestamp) assert.Equal(t, exemplarSlice.At(0).DoubleVal(), value) } @@ -866,18 +869,19 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) traces := buildSampleTrace() traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID() key := metricKey("metricKey") next := new(consumertest.TracesSink) p, err := newProcessor(zaptest.NewLogger(t), cfg, next) value := float64(42) // ----- call ------------------------------------------------------------- - p.updateLatencyExemplars(key, value, traceID) + p.updateLatencyExemplars(key, value, traceID, spanID) // ----- verify ----------------------------------------------------------- assert.NoError(t, err) assert.NotEmpty(t, p.latencyExemplarsData[key]) - assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, value: value}) + assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, spanID: spanID, value: value}) } func TestProcessorResetExemplarData(t *testing.T) { diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index a83c9011b2260..256ad1183bd24 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -54,7 +54,7 @@ func (p *Processor) ProcessMetrics(_ context.Context, td pmetric.Metrics) (pmetr case pmetric.MetricDataTypeSum: p.handleNumberDataPoints(metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) case pmetric.MetricDataTypeGauge: - p.handleNumberDataPoints(metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + p.handleNumberDataPoints(metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) case pmetric.MetricDataTypeHistogram: p.handleHistogramDataPoints(metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) case pmetric.MetricDataTypeExponentialHistogram: diff --git a/receiver/splunkhecreceiver/splunk_to_logdata.go b/receiver/splunkhecreceiver/splunk_to_logdata.go index 865b8f8cc11f6..1d447d59faecb 100644 --- a/receiver/splunkhecreceiver/splunk_to_logdata.go +++ b/receiver/splunkhecreceiver/splunk_to_logdata.go @@ -25,8 +25,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" ) -const ( - cannotConvertValue = "cannot convert field value to attribute" +var ( + errCannotConvertValue = errors.New("cannot convert field value to attribute") ) // splunkHecToLogData transforms splunk events into logs @@ -35,14 +35,11 @@ func splunkHecToLogData(logger *zap.Logger, events []*splunk.Event, resourceCust rl := ld.ResourceLogs().AppendEmpty() sl := rl.ScopeLogs().AppendEmpty() for _, event := range events { - attrValue, err := convertInterfaceToAttributeValue(logger, event.Event) - if err != nil { - logger.Debug("Unsupported value conversion", zap.Any("value", event.Event)) - return ld, errors.New(cannotConvertValue) - } - logRecord := sl.LogRecords().AppendEmpty() // The SourceType field is the most logical "name" of the event. - attrValue.CopyTo(logRecord.Body()) + logRecord := sl.LogRecords().AppendEmpty() + if err := convertToValue(logger, event.Event, logRecord.Body()); err != nil { + return ld, err + } // Splunk timestamps are in seconds so convert to nanos by multiplying // by 1 billion. @@ -50,6 +47,20 @@ func splunkHecToLogData(logger *zap.Logger, events []*splunk.Event, resourceCust logRecord.SetTimestamp(pcommon.Timestamp(*event.Time * 1e9)) } + // Set event fields first, so the specialized attributes overwrite them if needed. + keys := make([]string, 0, len(event.Fields)) + for k := range event.Fields { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + val := event.Fields[key] + err := convertToValue(logger, val, logRecord.Attributes().UpsertEmpty(key)) + if err != nil { + return ld, err + } + } + if event.Host != "" { logRecord.Attributes().UpsertString(config.HecToOtelAttrs.Host, event.Host) } @@ -65,70 +76,47 @@ func splunkHecToLogData(logger *zap.Logger, events []*splunk.Event, resourceCust if resourceCustomizer != nil { resourceCustomizer(rl.Resource()) } - keys := make([]string, 0, len(event.Fields)) - for k := range event.Fields { - keys = append(keys, k) - } - sort.Strings(keys) - for _, key := range keys { - val := event.Fields[key] - attrValue, err := convertInterfaceToAttributeValue(logger, val) - if err != nil { - return ld, err - } - logRecord.Attributes().Insert(key, attrValue) - } } return ld, nil } -func convertInterfaceToAttributeValue(logger *zap.Logger, originalValue interface{}) (pcommon.Value, error) { - if originalValue == nil { - return pcommon.NewValueEmpty(), nil - } else if value, ok := originalValue.(string); ok { - return pcommon.NewValueString(value), nil - } else if value, ok := originalValue.(int64); ok { - return pcommon.NewValueInt(value), nil - } else if value, ok := originalValue.(float64); ok { - return pcommon.NewValueDouble(value), nil - } else if value, ok := originalValue.(bool); ok { - return pcommon.NewValueBool(value), nil - } else if value, ok := originalValue.(map[string]interface{}); ok { - mapValue, err := convertToAttributeMap(logger, value) - if err != nil { - return pcommon.NewValueEmpty(), err - } - return mapValue, nil - } else if value, ok := originalValue.([]interface{}); ok { - arrValue, err := convertToSliceVal(logger, value) - if err != nil { - return pcommon.NewValueEmpty(), err - } - return arrValue, nil - } else { - logger.Debug("Unsupported value conversion", zap.Any("value", originalValue)) - return pcommon.NewValueEmpty(), errors.New(cannotConvertValue) +func convertToValue(logger *zap.Logger, src interface{}, dest pcommon.Value) error { + switch value := src.(type) { + case nil: + case string: + dest.SetStringVal(value) + case int64: + dest.SetIntVal(value) + case float64: + dest.SetDoubleVal(value) + case bool: + dest.SetBoolVal(value) + case map[string]interface{}: + return convertToAttributeMap(logger, value, dest) + case []interface{}: + return convertToSliceVal(logger, value, dest) + default: + logger.Debug("Unsupported value conversion", zap.Any("value", src)) + return errCannotConvertValue + } + return nil } -func convertToSliceVal(logger *zap.Logger, value []interface{}) (pcommon.Value, error) { - attrVal := pcommon.NewValueSlice() - arr := attrVal.SliceVal() +func convertToSliceVal(logger *zap.Logger, value []interface{}, dest pcommon.Value) error { + arr := dest.SetEmptySliceVal() for _, elt := range value { - translatedElt, err := convertInterfaceToAttributeValue(logger, elt) + err := convertToValue(logger, elt, arr.AppendEmpty()) if err != nil { - return attrVal, err + return err } - tgt := arr.AppendEmpty() - translatedElt.CopyTo(tgt) } - return attrVal, nil + return nil } -func convertToAttributeMap(logger *zap.Logger, value map[string]interface{}) (pcommon.Value, error) { - attrVal := pcommon.NewValueMap() - attrMap := attrVal.MapVal() +func convertToAttributeMap(logger *zap.Logger, value map[string]interface{}, dest pcommon.Value) error { + attrMap := dest.SetEmptyMapVal() keys := make([]string, 0, len(value)) for k := range value { keys = append(keys, k) @@ -136,11 +124,9 @@ func convertToAttributeMap(logger *zap.Logger, value map[string]interface{}) (pc sort.Strings(keys) for _, k := range keys { v := value[k] - translatedElt, err := convertInterfaceToAttributeValue(logger, v) - if err != nil { - return attrVal, err + if err := convertToValue(logger, v, attrMap.UpsertEmpty(k)); err != nil { + return err } - attrMap.Insert(k, translatedElt) } - return attrVal, nil + return nil } diff --git a/receiver/splunkhecreceiver/splunk_to_logdata_test.go b/receiver/splunkhecreceiver/splunk_to_logdata_test.go index 0bf1383e12923..8f0f63c7e1c85 100644 --- a/receiver/splunkhecreceiver/splunk_to_logdata_test.go +++ b/receiver/splunkhecreceiver/splunk_to_logdata_test.go @@ -189,11 +189,11 @@ func Test_SplunkHecToLogData(t *testing.T) { logRecord := sl.LogRecords().AppendEmpty() logRecord.Body().SetStringVal("value") logRecord.SetTimestamp(pcommon.Timestamp(0)) + logRecord.Attributes().UpsertString("foo", "bar") logRecord.Attributes().UpsertString("myhost", "localhost") logRecord.Attributes().UpsertString("mysource", "mysource") logRecord.Attributes().UpsertString("mysourcetype", "mysourcetype") logRecord.Attributes().UpsertString("myindex", "myindex") - logRecord.Attributes().UpsertString("foo", "bar") return lrs }(), wantErr: nil, @@ -216,71 +216,65 @@ func createLogsSlice(nanoseconds int) plog.ResourceLogsSlice { logRecord := sl.LogRecords().AppendEmpty() logRecord.Body().SetStringVal("value") logRecord.SetTimestamp(pcommon.Timestamp(nanoseconds)) + logRecord.Attributes().UpsertString("foo", "bar") logRecord.Attributes().UpsertString("host.name", "localhost") logRecord.Attributes().UpsertString("com.splunk.source", "mysource") logRecord.Attributes().UpsertString("com.splunk.sourcetype", "mysourcetype") logRecord.Attributes().UpsertString("com.splunk.index", "myindex") - logRecord.Attributes().UpsertString("foo", "bar") return lrs } -func Test_ConvertAttributeValueEmpty(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), nil) - assert.NoError(t, err) +func TestConvertToValueEmpty(t *testing.T) { + value := pcommon.NewValueEmpty() + assert.NoError(t, convertToValue(zap.NewNop(), nil, value)) assert.Equal(t, pcommon.NewValueEmpty(), value) } -func Test_ConvertAttributeValueString(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), "foo") - assert.NoError(t, err) +func TestConvertToValueString(t *testing.T) { + value := pcommon.NewValueEmpty() + assert.NoError(t, convertToValue(zap.NewNop(), "foo", value)) assert.Equal(t, pcommon.NewValueString("foo"), value) } -func Test_ConvertAttributeValueBool(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), false) - assert.NoError(t, err) +func TestConvertToValueBool(t *testing.T) { + value := pcommon.NewValueEmpty() + assert.NoError(t, convertToValue(zap.NewNop(), false, value)) assert.Equal(t, pcommon.NewValueBool(false), value) } -func Test_ConvertAttributeValueFloat(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), 12.3) - assert.NoError(t, err) +func TestConvertToValueFloat(t *testing.T) { + value := pcommon.NewValueEmpty() + assert.NoError(t, convertToValue(zap.NewNop(), 12.3, value)) assert.Equal(t, pcommon.NewValueDouble(12.3), value) } -func Test_ConvertAttributeValueMap(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), map[string]interface{}{"foo": "bar"}) - assert.NoError(t, err) +func TestConvertToValueMap(t *testing.T) { + value := pcommon.NewValueEmpty() + assert.NoError(t, convertToValue(zap.NewNop(), map[string]interface{}{"foo": "bar"}, value)) atts := pcommon.NewValueMap() attMap := atts.MapVal() attMap.UpsertString("foo", "bar") assert.Equal(t, atts, value) } -func Test_ConvertAttributeValueArray(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), []interface{}{"foo"}) - assert.NoError(t, err) +func TestConvertToValueArray(t *testing.T) { + value := pcommon.NewValueEmpty() + assert.NoError(t, convertToValue(zap.NewNop(), []interface{}{"foo"}, value)) arrValue := pcommon.NewValueSlice() arr := arrValue.SliceVal() arr.AppendEmpty().SetStringVal("foo") assert.Equal(t, arrValue, value) } -func Test_ConvertAttributeValueInvalid(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), splunk.Event{}) - assert.Error(t, err) - assert.Equal(t, pcommon.NewValueEmpty(), value) +func TestConvertToValueInvalid(t *testing.T) { + assert.Error(t, convertToValue(zap.NewNop(), splunk.Event{}, pcommon.NewValueEmpty())) } -func Test_ConvertAttributeValueInvalidInMap(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), map[string]interface{}{"foo": splunk.Event{}}) - assert.Error(t, err) - assert.Equal(t, pcommon.NewValueEmpty(), value) +func TestConvertToValueInvalidInMap(t *testing.T) { + assert.Error(t, convertToValue(zap.NewNop(), map[string]interface{}{"foo": splunk.Event{}}, pcommon.NewValueEmpty())) } -func Test_ConvertAttributeValueInvalidInArray(t *testing.T) { - value, err := convertInterfaceToAttributeValue(zap.NewNop(), []interface{}{splunk.Event{}}) - assert.Error(t, err) - assert.Equal(t, pcommon.NewValueEmpty(), value) +func TestConvertToValueInvalidInArray(t *testing.T) { + assert.Error(t, convertToValue(zap.NewNop(), []interface{}{splunk.Event{}}, pcommon.NewValueEmpty())) } diff --git a/receiver/sqlqueryreceiver/metrics.go b/receiver/sqlqueryreceiver/metrics.go index 5ab45de01784f..2f2a336f1769b 100644 --- a/receiver/sqlqueryreceiver/metrics.go +++ b/receiver/sqlqueryreceiver/metrics.go @@ -39,16 +39,16 @@ func rowToMetric(row metricRow, cfg MetricCfg, dest pmetric.Metric, startTime pc return fmt.Errorf("rowToMetric: %w", err) } attrs := dataPoint.Attributes() + for k, v := range cfg.StaticAttributes { + attrs.UpsertString(k, v) + } for _, columnName := range cfg.AttributeColumns { if attrVal, found := row[columnName]; found { - attrs.InsertString(columnName, attrVal) + attrs.UpsertString(columnName, attrVal) } else { return fmt.Errorf("rowToMetric: attribute_column not found: '%s'", columnName) } } - for k, v := range cfg.StaticAttributes { - attrs.InsertString(k, v) - } return nil } diff --git a/unreleased/spanmetrics_exemplars_type_fields.yaml b/unreleased/spanmetrics_exemplars_type_fields.yaml new file mode 100755 index 0000000000000..b04019b0db260 --- /dev/null +++ b/unreleased/spanmetrics_exemplars_type_fields.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: spanmetricsprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Sets TraceID and SpanID fields in Exemplar type (as per the spec) and removes the use of FilteredAttributes to pass these values around. + +# One or more tracking issues related to the change +issues: [13401] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/unreleased/transform-fix-panic-gauge-type.yaml b/unreleased/transform-fix-panic-gauge-type.yaml new file mode 100755 index 0000000000000..700a6492b0e5d --- /dev/null +++ b/unreleased/transform-fix-panic-gauge-type.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fixes panic of transformprocessor handling Gauge types + +# One or more tracking issues related to the change +issues: [13905] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: