diff --git a/receiver/skywalkingreceiver/go.mod b/receiver/skywalkingreceiver/go.mod index c44445a9d7e6b..c477f254070e0 100644 --- a/receiver/skywalkingreceiver/go.mod +++ b/receiver/skywalkingreceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywal go 1.17 require ( + github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/stretchr/testify v1.8.0 go.opentelemetry.io/collector v0.55.0 diff --git a/receiver/skywalkingreceiver/go.sum b/receiver/skywalkingreceiver/go.sum index 385ae55823127..35547bf2b7281 100644 --- a/receiver/skywalkingreceiver/go.sum +++ b/receiver/skywalkingreceiver/go.sum @@ -191,6 +191,8 @@ github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= diff --git a/receiver/skywalkingreceiver/skywalkingproto_to_traces.go b/receiver/skywalkingreceiver/skywalkingproto_to_traces.go index b12e5b0768824..fb5ff60fc82e1 100644 --- a/receiver/skywalkingreceiver/skywalkingproto_to_traces.go +++ b/receiver/skywalkingreceiver/skywalkingproto_to_traces.go @@ -15,11 +15,14 @@ package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" import ( - "encoding/binary" + "bytes" + "encoding/hex" "reflect" + "strconv" "time" "unsafe" + "github.com/google/uuid" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.8.0" @@ -32,6 +35,10 @@ const ( AttributeParentService = "parent.service" AttributeParentInstance = "parent.service.instance" AttributeParentEndpoint = "parent.endpoint" + AttributeSkywalkingSpanID = "sw8.span_id" + AttributeSkywalkingTraceID = "sw8.trace_id" + AttributeSkywalkingSegmentID = "sw8.segment_id" + AttributeSkywalkingParentSpanID = "sw8.parent_span_id" AttributeNetworkAddressUsedAtPeer = "network.AddressUsedAtPeer" ) @@ -53,14 +60,17 @@ func SkywalkingToTraces(segment *agentV3.SegmentObject) ptrace.Traces { resourceSpan := traceData.ResourceSpans().AppendEmpty() rs := resourceSpan.Resource() + for _, span := range swSpans { swTagsToInternalResource(span, rs) - rs.Attributes().Insert(conventions.AttributeServiceName, pcommon.NewValueString(segment.GetService())) - rs.Attributes().Insert(conventions.AttributeServiceInstanceID, pcommon.NewValueString(segment.GetServiceInstance())) } + rs.Attributes().Insert(conventions.AttributeServiceName, pcommon.NewValueString(segment.GetService())) + rs.Attributes().Insert(conventions.AttributeServiceInstanceID, pcommon.NewValueString(segment.GetServiceInstance())) + rs.Attributes().Insert(AttributeSkywalkingTraceID, pcommon.NewValueString(segment.GetTraceId())) + il := resourceSpan.ScopeSpans().AppendEmpty() - swSpansToSpanSlice(segment.GetTraceId(), swSpans, il.Spans()) + swSpansToSpanSlice(segment.GetTraceId(), segment.GetTraceSegmentId(), swSpans, il.Spans()) return traceData } @@ -86,7 +96,7 @@ func swTagsToInternalResource(span *agentV3.SpanObject, dest pcommon.Resource) { } } -func swSpansToSpanSlice(traceID string, spans []*agentV3.SpanObject, dest ptrace.SpanSlice) { +func swSpansToSpanSlice(traceID string, segmentID string, spans []*agentV3.SpanObject, dest ptrace.SpanSlice) { if len(spans) == 0 { return } @@ -96,17 +106,19 @@ func swSpansToSpanSlice(traceID string, spans []*agentV3.SpanObject, dest ptrace if span == nil { continue } - swSpanToSpan(traceID, span, dest.AppendEmpty()) + swSpanToSpan(traceID, segmentID, span, dest.AppendEmpty()) } } -func swSpanToSpan(traceID string, span *agentV3.SpanObject, dest ptrace.Span) { - dest.SetTraceID(stringToTraceID(traceID)) - dest.SetSpanID(uInt32ToSpanID(uint32(span.GetSpanId()))) +func swSpanToSpan(traceID string, segmentID string, span *agentV3.SpanObject, dest ptrace.Span) { + dest.SetTraceID(swTraceIDToTraceID(traceID)) + // skywalking defines segmentId + spanId as unique identifier + // so use segmentId to convert to an unique otel-span + dest.SetSpanID(segmentIDToSpanID(segmentID, uint32(span.GetSpanId()))) // parent spanid = -1, means(root span) no parent span in skywalking,so just make otlp's parent span id empty. if span.ParentSpanId != -1 { - dest.SetParentSpanID(uInt32ToSpanID(uint32(span.GetParentSpanId()))) + dest.SetParentSpanID(segmentIDToSpanID(segmentID, uint32(span.GetParentSpanId()))) } dest.SetName(span.OperationName) @@ -121,6 +133,8 @@ func swSpanToSpan(traceID string, span *agentV3.SpanObject, dest ptrace.Span) { attrs.Clear() } + attrs.InsertString(AttributeSkywalkingSegmentID, segmentID) + setSwSpanIDToAttributes(span, attrs) setInternalSpanStatus(span, dest.Status()) switch { @@ -154,8 +168,8 @@ func swReferencesToSpanLinks(refs []*agentV3.SegmentReference, dest ptrace.SpanL for _, ref := range refs { link := dest.AppendEmpty() - link.SetTraceID(stringToTraceID(ref.TraceId)) - link.SetSpanID(stringToParentSpanID(ref.ParentTraceSegmentId)) + link.SetTraceID(swTraceIDToTraceID(ref.TraceId)) + link.SetSpanID(segmentIDToSpanID(ref.ParentTraceSegmentId, uint32(ref.ParentSpanId))) link.SetTraceState("") kvParis := []*common.KeyStringValuePair{ { @@ -193,6 +207,13 @@ func setInternalSpanStatus(span *agentV3.SpanObject, dest ptrace.SpanStatus) { } } +func setSwSpanIDToAttributes(span *agentV3.SpanObject, dest pcommon.Map) { + dest.InsertInt(AttributeSkywalkingSpanID, int64(span.GetSpanId())) + if span.ParentSpanId != -1 { + dest.InsertInt(AttributeSkywalkingParentSpanID, int64(span.GetParentSpanId())) + } +} + func swLogsToSpanEvents(logs []*agentV3.Log, dest ptrace.SpanEventSlice) { if len(logs) == 0 { return @@ -235,38 +256,102 @@ func microsecondsToTimestamp(ms int64) pcommon.Timestamp { return pcommon.NewTimestampFromTime(time.UnixMilli(ms)) } -func stringToTraceID(traceID string) pcommon.TraceID { - return pcommon.NewTraceID(unsafeStringToBytes(traceID)) -} +func swTraceIDToTraceID(traceID string) pcommon.TraceID { + // skywalking traceid format: + // de5980b8-fce3-4a37-aab9-b4ac3af7eedd: from browser/js-sdk/envoy/nginx-lua sdk/py-agent + // 56a5e1c519ae4c76a2b8b11d92cead7f.12.16563474296430001: from java-agent -func stringToParentSpanID(traceID string) pcommon.SpanID { - return pcommon.NewSpanID(unsafeStringTo8Bytes(traceID)) + if len(traceID) <= 36 { // 36: uuid length (rfc4122) + uid, err := uuid.Parse(traceID) + if err != nil { + return pcommon.InvalidTraceID() + } + return pcommon.NewTraceID(uid) + } + return pcommon.NewTraceID(swStringToUUID(traceID, 0)) } -// uInt32ToSpanID converts the uint64 representation of a SpanID to pcommon.SpanID. -func uInt32ToSpanID(id uint32) pcommon.SpanID { - spanID := [8]byte{} - binary.BigEndian.PutUint32(spanID[:], id) - return pcommon.NewSpanID(spanID) +func segmentIDToSpanID(segmentID string, spanID uint32) pcommon.SpanID { + // skywalking segmentid format: + // 56a5e1c519ae4c76a2b8b11d92cead7f.12.16563474296430001: from TraceSegmentId + // 56a5e1c519ae4c76a2b8b11d92cead7f: from ParentTraceSegmentId + + if len(segmentID) < 32 { + return pcommon.InvalidSpanID() + } + return pcommon.NewSpanID(uuidTo8Bytes(swStringToUUID(segmentID, spanID))) } -func unsafeStringToBytes(s string) [16]byte { - p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) +func swStringToUUID(s string, extra uint32) (dst [16]byte) { + // there are 2 possible formats for 's': + // s format = 56a5e1c519ae4c76a2b8b11d92cead7f.0000000000.000000000000000000 + // ^ start(length=32) ^ mid(u32) ^ last(u64) + // uid = UUID(start) XOR ([4]byte(extra) . [4]byte(uint32(mid)) . [8]byte(uint64(last))) + + // s format = 56a5e1c519ae4c76a2b8b11d92cead7f + // ^ start(length=32) + // uid = UUID(start) XOR [4]byte(extra) + + if len(s) < 32 { + return + } + + t := unsafeGetBytes(s) + var uid [16]byte + _, err := hex.Decode(uid[:], t[:32]) + if err != nil { + return uid + } + + for i := 0; i < 4; i++ { + uid[i] ^= byte(extra) + extra >>= 8 + } - var b [16]byte - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Data = uintptr(p) - hdr.Cap = len(s) - hdr.Len = len(s) - return b + if len(s) == 32 { + return uid + } + + index1 := bytes.IndexByte(t, '.') + index2 := bytes.LastIndexByte(t, '.') + if index1 != 32 || index2 < 0 { + return + } + + mid, err := strconv.Atoi(s[index1+1 : index2]) + if err != nil { + return + } + + last, err := strconv.Atoi(s[index2+1:]) + if err != nil { + return + } + + for i := 4; i < 8; i++ { + uid[i] ^= byte(mid) + mid >>= 8 + } + + for i := 8; i < 16; i++ { + uid[i] ^= byte(last) + last >>= 8 + } + + return uid +} + +func uuidTo8Bytes(uuid [16]byte) [8]byte { + // high bit XOR low bit + var dst [8]byte + for i := 0; i < 8; i++ { + dst[i] = uuid[i] ^ uuid[i+8] + } + return dst } -func unsafeStringTo8Bytes(s string) [8]byte { - p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data) - var b [8]byte - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Data = uintptr(p) - hdr.Cap = len(s) - hdr.Len = len(s) - return b +func unsafeGetBytes(s string) []byte { + return (*[0x7fff0000]byte)(unsafe.Pointer( + (*reflect.StringHeader)(unsafe.Pointer(&s)).Data), + )[:len(s):len(s)] } diff --git a/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go b/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go index f96b5d0f5cfaa..b0375b6d2f4a7 100644 --- a/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go +++ b/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go @@ -160,6 +160,154 @@ func TestSwLogsToSpanEvents(t *testing.T) { }) } } + +func Test_stringToTraceID(t *testing.T) { + type args struct { + traceID string + } + tests := []struct { + name string + segmentObject args + want [16]byte + }{ + { + name: "mock-sw-normal-trace-id-rfc4122v4", + segmentObject: args{traceID: "de5980b8-fce3-4a37-aab9-b4ac3af7eedd"}, + want: [16]byte{222, 89, 128, 184, 252, 227, 74, 55, 170, 185, 180, 172, 58, 247, 238, 221}, + }, + { + name: "mock-sw-normal-trace-id-rfc4122", + segmentObject: args{traceID: "de5980b8fce34a37aab9b4ac3af7eedd"}, + want: [16]byte{222, 89, 128, 184, 252, 227, 74, 55, 170, 185, 180, 172, 58, 247, 238, 221}, + }, + { + name: "mock-sw-trace-id-length-shorter", + segmentObject: args{traceID: "de59"}, + want: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + { + name: "mock-sw-trace-id-length-java-agent", + segmentObject: args{traceID: "de5980b8fce34a37aab9b4ac3af7eedd.1.16563474296430001"}, + want: [16]byte{222, 89, 128, 184, 253, 227, 74, 55, 27, 228, 27, 205, 94, 47, 212, 221}, + }, + { + name: "mock-sw-trace-id-illegal", + segmentObject: args{traceID: ".,<>?/-=+MNop"}, + want: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := swTraceIDToTraceID(tt.segmentObject.traceID) + assert.Equal(t, tt.want, got.Bytes()) + }) + } +} + +func Test_stringToTraceID_Unique(t *testing.T) { + type args struct { + traceID string + } + tests := []struct { + name string + segmentObject args + }{ + { + name: "mock-sw-trace-id-unique-1", + segmentObject: args{traceID: "de5980b8fce34a37aab9b4ac3af7eedd.133.16563474296430001"}, + }, + { + name: "mock-sw-trace-id-unique-2", + segmentObject: args{traceID: "de5980b8fce34a37aab9b4ac3af7eedd.133.16534574123430001"}, + }, + } + + var results [2][16]byte + for i := 0; i < 2; i++ { + tt := tests[i] + t.Run(tt.name, func(t *testing.T) { + got := swTraceIDToTraceID(tt.segmentObject.traceID) + results[i] = got.Bytes() + }) + } + assert.NotEqual(t, tests[0].segmentObject.traceID, t, tests[1].segmentObject.traceID) + assert.NotEqual(t, results[0], results[1]) +} + +func Test_segmentIdToSpanId(t *testing.T) { + type args struct { + segmentID string + spanID uint32 + } + tests := []struct { + name string + args args + want [8]byte + }{ + { + name: "mock-sw-span-id-normal", + args: args{segmentID: "4f2f27748b8e44ecaf18fe0347194e86.33.16560607369950066", spanID: 123}, + want: [8]byte{233, 196, 85, 168, 37, 66, 48, 106}, + }, + { + name: "mock-sw-span-id-python-agent", + args: args{segmentID: "4f2f27748b8e44ecaf18fe0347194e86", spanID: 123}, + want: [8]byte{155, 55, 217, 119, 204, 151, 10, 106}, + }, + { + name: "mock-sw-span-id-short", + args: args{segmentID: "16560607369950066", spanID: 12}, + want: [8]byte{0, 0, 0, 0, 0, 0, 0, 0}, + }, + { + name: "mock-sw-span-id-illegal-1", + args: args{segmentID: "1", spanID: 2}, + want: [8]byte{0, 0, 0, 0, 0, 0, 0, 0}, + }, + { + name: "mock-sw-span-id-illegal-char", + args: args{segmentID: ".,<>?/-=+MNop", spanID: 2}, + want: [8]byte{0, 0, 0, 0, 0, 0, 0, 0}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := segmentIDToSpanID(tt.args.segmentID, tt.args.spanID) + assert.Equal(t, tt.want, got.Bytes()) + }) + } +} + +func Test_segmentIdToSpanId_Unique(t *testing.T) { + type args struct { + segmentID string + spanID uint32 + } + tests := []struct { + name string + args args + }{ + { + name: "mock-sw-span-id-unique-1", + args: args{segmentID: "4f2f27748b8e44ecaf18fe0347194e86.33.16560607369950066", spanID: 123}, + }, + { + name: "mock-sw-span-id-unique-2", + args: args{segmentID: "4f2f27748b8e44ecaf18fe0347194e86.33.16560607369950066", spanID: 1}, + }, + } + var results [2][8]byte + for i := 0; i < 2; i++ { + tt := tests[i] + t.Run(tt.name, func(t *testing.T) { + got := segmentIDToSpanID(tt.args.segmentID, tt.args.spanID) + results[i] = got.Bytes() + }) + } + + assert.NotEqual(t, results[0], results[1]) +} + func generateTracesOneEmptyResourceSpans() ptrace.Span { td := ptrace.NewTraces() resourceSpan := td.ResourceSpans().AppendEmpty() diff --git a/receiver/skywalkingreceiver/trace_receiver.go b/receiver/skywalkingreceiver/trace_receiver.go index 32e22caab1c7b..02e02c7c66f2e 100644 --- a/receiver/skywalkingreceiver/trace_receiver.go +++ b/receiver/skywalkingreceiver/trace_receiver.go @@ -222,7 +222,7 @@ func (sr *swReceiver) httpHandler(rsp http.ResponseWriter, r *http.Request) { } for _, segment := range data { - err = consumeTraces(context.Background(), segment, sr.nextConsumer) + err = consumeTraces(r.Context(), segment, sr.nextConsumer) if err != nil { fmt.Printf("cannot consume traces, %v", err) } diff --git a/receiver/skywalkingreceiver/tracing_report_service.go b/receiver/skywalkingreceiver/tracing_report_service.go index f3ad4fcdb1f5c..05ee826bfda56 100644 --- a/receiver/skywalkingreceiver/tracing_report_service.go +++ b/receiver/skywalkingreceiver/tracing_report_service.go @@ -41,7 +41,7 @@ func (s *traceSegmentReportService) Collect(stream agent.TraceSegmentReportServi return err } - err = consumeTraces(context.Background(), segmentObject, s.sr.nextConsumer) + err = consumeTraces(stream.Context(), segmentObject, s.sr.nextConsumer) if err != nil { return stream.SendAndClose(&common.Commands{}) } diff --git a/unreleased/skywalkingreceiver-fix.yaml b/unreleased/skywalkingreceiver-fix.yaml new file mode 100644 index 0000000000000..c2e14c33e138e --- /dev/null +++ b/unreleased/skywalkingreceiver-fix.yaml @@ -0,0 +1,4 @@ +change_type: bug_fix +component: skywalkingreceiver +note: Fix skywalking traceid and spanid convertion +issues: [11562] \ No newline at end of file