From 56fb2aa122c26117271485a3dabb465ebece412c Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Mon, 27 Mar 2023 20:26:42 +0800 Subject: [PATCH 01/14] [receiver/skywalking]: 1. Restructuring the directory/structure 2. Adding an HTTP trace Reception unit test --- cmd/otelcontribcol/config.yaml | 31 ++++ receiver/skywalkingreceiver/factory.go | 26 +++- receiver/skywalkingreceiver/factory_test.go | 31 ++-- receiver/skywalkingreceiver/go.mod | 5 +- receiver/skywalkingreceiver/go.sum | 5 +- .../trace}/skywalkingproto_to_traces.go | 2 +- .../trace}/skywalkingproto_to_traces_test.go | 91 ++++++++++- .../internal/trace/tracing_report_service.go | 143 ++++++++++++++++++ ...ace_receiver.go => skywalking_receiver.go} | 101 +++---------- ...er_test.go => skywalking_receiver_test.go} | 69 +++++++-- .../tracing_report_service.go | 72 --------- 11 files changed, 392 insertions(+), 184 deletions(-) create mode 100644 cmd/otelcontribcol/config.yaml rename receiver/skywalkingreceiver/{ => internal/trace}/skywalkingproto_to_traces.go (98%) rename receiver/skywalkingreceiver/{ => internal/trace}/skywalkingproto_to_traces_test.go (78%) create mode 100644 receiver/skywalkingreceiver/internal/trace/tracing_report_service.go rename receiver/skywalkingreceiver/{trace_receiver.go => skywalking_receiver.go} (71%) rename receiver/skywalkingreceiver/{trace_receiver_test.go => skywalking_receiver_test.go} (72%) delete mode 100644 receiver/skywalkingreceiver/tracing_report_service.go diff --git a/cmd/otelcontribcol/config.yaml b/cmd/otelcontribcol/config.yaml new file mode 100644 index 0000000000000..e966e3bac98a6 --- /dev/null +++ b/cmd/otelcontribcol/config.yaml @@ -0,0 +1,31 @@ +extensions: +receivers: +# otlp: +# protocols: +# grpc: +# http: + skywalking: + protocols: + grpc: + endpoint: 0.0.0.0:11800 + http: + endpoint: 0.0.0.0:12800 +exporters: + logging: + loglevel: -1 +service: + + pipelines: + + traces: + receivers: [skywalking] + processors: + exporters: [logging] +# metrics: +# receivers: [skywalking] +# processors: +# exporters: [logging] + + + + extensions: [] \ No newline at end of file diff --git a/receiver/skywalkingreceiver/factory.go b/receiver/skywalkingreceiver/factory.go index 333e4b8104bd1..4e32e6ef47eca 100644 --- a/receiver/skywalkingreceiver/factory.go +++ b/receiver/skywalkingreceiver/factory.go @@ -22,6 +22,8 @@ import ( "net" "strconv" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" @@ -80,6 +82,24 @@ func createTracesReceiver( // that Skywalking receiver understands. rCfg := cfg.(*Config) + c, err := createConfiguration(rCfg) + if err != nil { + return nil, err + } + + r := receivers.GetOrAdd(cfg, func() component.Component { + return newSkywalkingReceiver(c, set) + }) + + if err = r.Unwrap().(*swReceiver).registerTraceConsumer(nextConsumer); err != nil { + return nil, err + } + + return r, nil +} + +// create the config that Skywalking receiver will use. +func createConfiguration(rCfg *Config) (*configuration, error) { var err error var c configuration // Set ports @@ -96,9 +116,7 @@ func createTracesReceiver( return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err) } } - - // Create the receiver. - return newSkywalkingReceiver(&c, nextConsumer, set) + return &c, nil } // extract the port number from string in "address:port" format. If the @@ -117,3 +135,5 @@ func extractPortFromEndpoint(endpoint string) (int, error) { } return int(port), nil } + +var receivers = sharedcomponent.NewSharedComponents() diff --git a/receiver/skywalkingreceiver/factory_test.go b/receiver/skywalkingreceiver/factory_test.go index 776c0e14d9719..173dc448939d9 100644 --- a/receiver/skywalkingreceiver/factory_test.go +++ b/receiver/skywalkingreceiver/factory_test.go @@ -19,6 +19,9 @@ import ( "path/filepath" "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "go.opentelemetry.io/collector/consumer/consumertest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -54,8 +57,9 @@ func TestCreateReceiver(t *testing.T) { Transport: "tcp", }, } + traceSink := new(consumertest.TracesSink) set := receivertest.NewNopCreateSettings() - tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") @@ -75,7 +79,8 @@ func TestCreateReceiverGeneralConfig(t *testing.T) { require.NoError(t, component.UnmarshalConfig(sub, cfg)) set := receivertest.NewNopCreateSettings() - tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + traceSink := new(consumertest.TracesSink) + tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") @@ -94,11 +99,12 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) { Transport: "tcp", }, } + traceSink := new(consumertest.TracesSink) set := receivertest.NewNopCreateSettings() - r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) - + r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, 11800, r.(*swReceiver).config.CollectorGRPCPort, "grpc port should be default") + assert.Equal(t, 11800, r.(*sharedcomponent.SharedComponent). + Unwrap().(*swReceiver).config.CollectorGRPCPort, "grpc port should be default") } func TestCreateTLSGPRCEndpoint(t *testing.T) { @@ -118,8 +124,8 @@ func TestCreateTLSGPRCEndpoint(t *testing.T) { }, } set := receivertest.NewNopCreateSettings() - - _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + traceSink := new(consumertest.TracesSink) + _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "tls-enabled receiver creation failed") } @@ -138,8 +144,8 @@ func TestCreateTLSHTTPEndpoint(t *testing.T) { } set := receivertest.NewNopCreateSettings() - - _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + traceSink := new(consumertest.TracesSink) + _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "tls-enabled receiver creation failed") } @@ -151,8 +157,9 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) { Endpoint: defaultHTTPBindEndpoint, } set := receivertest.NewNopCreateSettings() - r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) - + traceSink := new(consumertest.TracesSink) + r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, 12800, r.(*swReceiver).config.CollectorHTTPPort, "http port should be default") + assert.Equal(t, 12800, r.(*sharedcomponent.SharedComponent). + Unwrap().(*swReceiver).config.CollectorHTTPPort, "http port should be default") } diff --git a/receiver/skywalkingreceiver/go.mod b/receiver/skywalkingreceiver/go.mod index dafdd2eaf22fa..ad6b5033dbb6b 100644 --- a/receiver/skywalkingreceiver/go.mod +++ b/receiver/skywalkingreceiver/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.8.2 go.opentelemetry.io/collector v0.74.0 go.opentelemetry.io/collector/component v0.74.0 @@ -22,7 +23,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -37,7 +37,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.17 // indirect - github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.8.3 // indirect go.opencensus.io v0.24.0 // indirect @@ -57,4 +56,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent + retract v0.65.0 diff --git a/receiver/skywalkingreceiver/go.sum b/receiver/skywalkingreceiver/go.sum index f274af9bac68e..1c069b627ec9a 100644 --- a/receiver/skywalkingreceiver/go.sum +++ b/receiver/skywalkingreceiver/go.sum @@ -68,7 +68,6 @@ github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -244,7 +243,6 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= -github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -435,7 +433,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -508,8 +505,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/receiver/skywalkingreceiver/skywalkingproto_to_traces.go b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go similarity index 98% rename from receiver/skywalkingreceiver/skywalkingproto_to_traces.go rename to receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go index efdc7f881fde2..ab1c7a7fda4ca 100644 --- a/receiver/skywalkingreceiver/skywalkingproto_to_traces.go +++ b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" +package trace import ( "bytes" diff --git a/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go similarity index 78% rename from receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go rename to receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go index 5262a32be43e5..bc60b63da0641 100644 --- a/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go +++ b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -package skywalkingreceiver +package trace import ( "strconv" "testing" + "time" + + common "skywalking.apache.org/repo/goapi/collect/common/v3" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) @@ -316,3 +320,88 @@ func generateTracesOneEmptyResourceSpans() ptrace.Span { il.Spans().AppendEmpty() return il.Spans().At(0) } + +func mockGrpcTraceSegment(sequence int) *agent.SegmentObject { + seq := strconv.Itoa(sequence) + return &agent.SegmentObject{ + TraceId: "trace" + seq, + TraceSegmentId: "trace-segment" + seq, + Service: "demo-segmentReportService" + seq, + ServiceInstance: "demo-instance" + seq, + IsSizeLimited: false, + Spans: []*agent.SpanObject{ + { + SpanId: 1, + ParentSpanId: 0, + StartTime: time.Now().Unix(), + EndTime: time.Now().Unix() + 10, + OperationName: "operation" + seq, + Peer: "127.0.0.1:6666", + SpanType: agent.SpanType_Entry, + SpanLayer: agent.SpanLayer_Http, + ComponentId: 1, + IsError: false, + SkipAnalysis: false, + Tags: []*common.KeyStringValuePair{ + { + Key: "mock-key" + seq, + Value: "mock-value" + seq, + }, + }, + Logs: []*agent.Log{ + { + Time: time.Now().Unix(), + Data: []*common.KeyStringValuePair{ + { + Key: "log-key" + seq, + Value: "log-value" + seq, + }, + }, + }, + }, + Refs: []*agent.SegmentReference{ + { + RefType: agent.RefType_CrossThread, + TraceId: "trace" + seq, + ParentTraceSegmentId: "parent-trace-segment" + seq, + ParentSpanId: 0, + ParentService: "parent" + seq, + ParentServiceInstance: "parent" + seq, + ParentEndpoint: "parent" + seq, + NetworkAddressUsedAtPeer: "127.0.0.1:6666", + }, + }, + }, + { + SpanId: 2, + ParentSpanId: 1, + StartTime: time.Now().Unix(), + EndTime: time.Now().Unix() + 20, + OperationName: "operation" + seq, + Peer: "127.0.0.1:6666", + SpanType: agent.SpanType_Local, + SpanLayer: agent.SpanLayer_Http, + ComponentId: 2, + IsError: false, + SkipAnalysis: false, + Tags: []*common.KeyStringValuePair{ + { + Key: "mock-key" + seq, + Value: "mock-value" + seq, + }, + }, + Logs: []*agent.Log{ + { + Time: time.Now().Unix(), + Data: []*common.KeyStringValuePair{ + { + Key: "log-key" + seq, + Value: "log-value" + seq, + }, + }, + }, + }, + }, + }, + } +} diff --git a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go new file mode 100644 index 0000000000000..66ec2f35b7037 --- /dev/null +++ b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go @@ -0,0 +1,143 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/receiver" + "google.golang.org/protobuf/proto" + common "skywalking.apache.org/repo/goapi/collect/common/v3" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" +) + +const ( + collectorHTTPTransport = "http" + grpcTransport = "grpc" + failing = "failing" +) + +type Receiver struct { + nextConsumer consumer.Traces + grpcObsrecv *obsreport.Receiver + httpObsrecv *obsreport.Receiver + agent.UnimplementedTraceSegmentReportServiceServer +} + +// New creates a new Receiver reference. +func New(nextConsumer consumer.Traces, set receiver.CreateSettings) (*Receiver, error) { + grpcObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: set.ID, + Transport: grpcTransport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } + httpObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: set.ID, + Transport: collectorHTTPTransport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } + return &Receiver{ + nextConsumer: nextConsumer, + grpcObsrecv: grpcObsrecv, + httpObsrecv: httpObsrecv, + }, nil +} + +// Collect implements the service Collect traces func. +func (r *Receiver) Collect(stream agent.TraceSegmentReportService_CollectServer) error { + for { + segmentObject, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return stream.SendAndClose(&common.Commands{}) + } + return err + } + + err = consumeTraces(stream.Context(), segmentObject, r.nextConsumer) + if err != nil { + return stream.SendAndClose(&common.Commands{}) + } + } +} + +// CollectInSync implements the service CollectInSync traces func. +func (r *Receiver) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) { + for _, segment := range segments.Segments { + marshaledSegment, err := proto.Marshal(segment) + if err != nil { + fmt.Printf("cannot marshal segemnt from sync, %v", err) + } + err = consumeTraces(ctx, segment, r.nextConsumer) + if err != nil { + fmt.Printf("cannot consume traces, %v", err) + } + fmt.Printf("receivec data:%s", marshaledSegment) + } + return &common.Commands{}, nil +} + +func consumeTraces(ctx context.Context, segment *agent.SegmentObject, consumer consumer.Traces) error { + if segment == nil { + return nil + } + ptd := SkywalkingToTraces(segment) + return consumer.ConsumeTraces(ctx, ptd) +} + +func (r *Receiver) HTTPHandler(rsp http.ResponseWriter, req *http.Request) { + rsp.Header().Set("Content-Type", "application/json") + b, err := io.ReadAll(req.Body) + if err != nil { + response := &Response{Status: failing, Msg: err.Error()} + ResponseWithJSON(rsp, response, http.StatusBadRequest) + return + } + var data []*v3.SegmentObject + if err = json.Unmarshal(b, &data); err != nil { + fmt.Printf("cannot Unmarshal skywalking segment collection, %v", err) + } + + for _, segment := range data { + err = consumeTraces(req.Context(), segment, r.nextConsumer) + if err != nil { + fmt.Printf("cannot consume traces, %v", err) + } + } +} + +type Response struct { + Status string `json:"status"` + Msg string `json:"msg"` +} + +func ResponseWithJSON(rsp http.ResponseWriter, response *Response, code int) { + rsp.WriteHeader(code) + _ = json.NewEncoder(rsp).Encode(response) +} diff --git a/receiver/skywalkingreceiver/trace_receiver.go b/receiver/skywalkingreceiver/skywalking_receiver.go similarity index 71% rename from receiver/skywalkingreceiver/trace_receiver.go rename to receiver/skywalkingreceiver/skywalking_receiver.go index 0acd9e35f9d3b..feba434791f4b 100644 --- a/receiver/skywalkingreceiver/trace_receiver.go +++ b/receiver/skywalkingreceiver/skywalking_receiver.go @@ -16,20 +16,19 @@ package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" - "encoding/json" "errors" "fmt" - "io" "net" "net/http" "sync" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" + "github.com/gorilla/mux" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver" "go.uber.org/multierr" "google.golang.org/grpc" @@ -52,8 +51,6 @@ type configuration struct { // Receiver type is used to receive spans that were originally intended to be sent to Skywaking. // This receiver is basically a Skywalking collector. type swReceiver struct { - nextConsumer consumer.Traces - config *configuration grpc *grpc.Server @@ -63,49 +60,33 @@ type swReceiver struct { settings receiver.CreateSettings - grpcObsrecv *obsreport.Receiver - httpObsrecv *obsreport.Receiver - segmentReportService *traceSegmentReportService - dummyReportService *dummyReportService -} + traceReceiver *trace.Receiver -const ( - collectorHTTPTransport = "http" - grpcTransport = "grpc" - failing = "failing" -) + dummyReportService *dummyReportService +} // newSkywalkingReceiver creates a TracesReceiver that receives traffic as a Skywalking collector func newSkywalkingReceiver( config *configuration, - nextConsumer consumer.Traces, set receiver.CreateSettings, -) (*swReceiver, error) { +) *swReceiver { + return &swReceiver{ + config: config, + settings: set, + } +} - grpcObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ - ReceiverID: set.ID, - Transport: grpcTransport, - ReceiverCreateSettings: set, - }) - if err != nil { - return nil, err +// registerTraceConsumer register a TracesReceiver that receives trace +func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error { + if tc == nil { + return component.ErrNilNextConsumer } - httpObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ - ReceiverID: set.ID, - Transport: collectorHTTPTransport, - ReceiverCreateSettings: set, - }) + var err error + sr.traceReceiver, err = trace.New(tc, sr.settings) if err != nil { - return nil, err + return err } - - return &swReceiver{ - config: config, - nextConsumer: nextConsumer, - settings: set, - grpcObsrecv: grpcObsrecv, - httpObsrecv: httpObsrecv, - }, nil + return nil } func (sr *swReceiver) collectorGRPCAddr() string { @@ -157,7 +138,7 @@ func (sr *swReceiver) startCollector(host component.Host) error { } nr := mux.NewRouter() - nr.HandleFunc("/v3/segments", sr.httpHandler).Methods(http.MethodPost) + nr.HandleFunc("/v3/segments", sr.traceReceiver.HTTPHandler).Methods(http.MethodPost) sr.collectorServer, cerr = sr.config.CollectorHTTPSettings.ToServer(host, sr.settings.TelemetrySettings, nr) if cerr != nil { return cerr @@ -178,25 +159,24 @@ func (sr *swReceiver) startCollector(host component.Host) error { if err != nil { return fmt.Errorf("failed to build the options for the Skywalking gRPC Collector: %w", err) } - gaddr := sr.collectorGRPCAddr() gln, gerr := net.Listen("tcp", gaddr) if gerr != nil { return fmt.Errorf("failed to bind to gRPC address %q: %w", gaddr, gerr) } - - sr.segmentReportService = &traceSegmentReportService{sr: sr} - v3.RegisterTraceSegmentReportServiceServer(sr.grpc, sr.segmentReportService) + if sr.traceReceiver != nil { + v3.RegisterTraceSegmentReportServiceServer(sr.grpc, sr.traceReceiver) + } sr.dummyReportService = &dummyReportService{} - management.RegisterManagementServiceServer(sr.grpc, sr.dummyReportService) cds.RegisterConfigurationDiscoveryServiceServer(sr.grpc, sr.dummyReportService) event.RegisterEventServiceServer(sr.grpc, &eventService{}) profile.RegisterProfileTaskServer(sr.grpc, sr.dummyReportService) - v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.dummyReportService) v3.RegisterMeterReportServiceServer(sr.grpc, &meterService{}) v3.RegisterCLRMetricReportServiceServer(sr.grpc, &clrService{}) v3.RegisterBrowserPerfServiceServer(sr.grpc, sr.dummyReportService) + //TODO: add jvm metrics service + v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.dummyReportService) sr.goroutines.Add(1) go func() { @@ -209,34 +189,3 @@ func (sr *swReceiver) startCollector(host component.Host) error { return nil } - -type Response struct { - Status string `json:"status"` - Msg string `json:"msg"` -} - -func (sr *swReceiver) httpHandler(rsp http.ResponseWriter, r *http.Request) { - rsp.Header().Set("Content-Type", "application/json") - b, err := io.ReadAll(r.Body) - if err != nil { - response := &Response{Status: failing, Msg: err.Error()} - ResponseWithJSON(rsp, response, http.StatusBadRequest) - return - } - var data []*v3.SegmentObject - if err = json.Unmarshal(b, &data); err != nil { - fmt.Printf("cannot Unmarshal skywalking segment collection, %v", err) - } - - for _, segment := range data { - err = consumeTraces(r.Context(), segment, sr.nextConsumer) - if err != nil { - fmt.Printf("cannot consume traces, %v", err) - } - } -} - -func ResponseWithJSON(rsp http.ResponseWriter, response *Response, code int) { - rsp.WriteHeader(code) - _ = json.NewEncoder(rsp).Encode(response) -} diff --git a/receiver/skywalkingreceiver/trace_receiver_test.go b/receiver/skywalkingreceiver/skywalking_receiver_test.go similarity index 72% rename from receiver/skywalkingreceiver/trace_receiver_test.go rename to receiver/skywalkingreceiver/skywalking_receiver_test.go index fc338a851045e..cfdd30c23a2e1 100644 --- a/receiver/skywalkingreceiver/trace_receiver_test.go +++ b/receiver/skywalkingreceiver/skywalking_receiver_test.go @@ -15,17 +15,21 @@ package skywalkingreceiver import ( + "bytes" "context" + "encoding/json" "fmt" + "net/http" "strconv" "testing" "time" + "go.opentelemetry.io/collector/config/confighttp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/grpc" @@ -41,9 +45,10 @@ var ( func TestTraceSource(t *testing.T) { set := receivertest.NewNopCreateSettings() set.ID = skywalkingReceiver - jr, err := newSkywalkingReceiver(&configuration{}, nil, set) - require.NoError(t, err) - require.NotNil(t, jr) + mockSwReceiver := newSkywalkingReceiver(&configuration{}, set) + err := mockSwReceiver.registerTraceConsumer(nil) + assert.Equal(t, err, component.ErrNilNextConsumer) + require.NotNil(t, mockSwReceiver) } func TestStartAndShutdown(t *testing.T) { @@ -58,9 +63,9 @@ func TestStartAndShutdown(t *testing.T) { set := receivertest.NewNopCreateSettings() set.ID = skywalkingReceiver - sr, err := newSkywalkingReceiver(config, sink, set) + sr := newSkywalkingReceiver(config, set) + err := sr.registerTraceConsumer(sink) require.NoError(t, err) - require.NoError(t, sr.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, sr.Shutdown(context.Background())) }) @@ -75,20 +80,19 @@ func TestGRPCReception(t *testing.T) { set := receivertest.NewNopCreateSettings() set.ID = skywalkingReceiver - swReceiver, err := newSkywalkingReceiver(config, sink, set) + mockSwReceiver := newSkywalkingReceiver(config, set) + err := mockSwReceiver.registerTraceConsumer(sink) require.NoError(t, err) + require.NoError(t, mockSwReceiver.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, swReceiver.Start(context.Background(), componenttest.NewNopHost())) - - t.Cleanup(func() { require.NoError(t, swReceiver.Shutdown(context.Background())) }) - + t.Cleanup(func() { require.NoError(t, mockSwReceiver.Shutdown(context.Background())) }) conn, err := grpc.Dial(fmt.Sprintf("0.0.0.0:%d", config.CollectorGRPCPort), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer conn.Close() segmentCollection := &agent.SegmentCollection{ Segments: []*agent.SegmentObject{ - mockGrpcTraceSegment(1), + MockGrpcTraceSegment(1), }, } @@ -103,7 +107,40 @@ func TestGRPCReception(t *testing.T) { assert.NotNil(t, commands) } -func mockGrpcTraceSegment(sequence int) *agent.SegmentObject { +func TestHttpReception(t *testing.T) { + config := &configuration{ + CollectorGRPCPort: 12800, // that's the only one used by this test + } + + sink := new(consumertest.TracesSink) + + set := receivertest.NewNopCreateSettings() + set.ID = skywalkingReceiver + mockSwReceiver := newSkywalkingReceiver(config, set) + err := mockSwReceiver.registerTraceConsumer(sink) + require.NoError(t, err) + require.NoError(t, mockSwReceiver.Start(context.Background(), componenttest.NewNopHost())) + + t.Cleanup(func() { require.NoError(t, mockSwReceiver.Shutdown(context.Background())) }) + + rb, err := mockSkywalkingHTTPTraceSegment() + require.NoError(t, err) + req, err := http.NewRequest("POST", "http://0.0.0.0:12800/v3/segments", bytes.NewBuffer(rb)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + response, err := client.Do(req) + // http client send trace data to otel/skywalkingreceiver + if err != nil { + t.Fatalf("cannot send data in sync mode: %v", err) + } + // verify + assert.NoError(t, err, "send skywalking segment successful.") + assert.NotNil(t, response) + +} + +func MockGrpcTraceSegment(sequence int) *agent.SegmentObject { seq := strconv.Itoa(sequence) return &agent.SegmentObject{ TraceId: "trace" + seq, @@ -187,3 +224,9 @@ func mockGrpcTraceSegment(sequence int) *agent.SegmentObject { }, } } + +func mockSkywalkingHTTPTraceSegment() ([]byte, error) { + segment := MockGrpcTraceSegment(1) + tb, err := json.Marshal(segment) + return tb, err +} diff --git a/receiver/skywalkingreceiver/tracing_report_service.go b/receiver/skywalkingreceiver/tracing_report_service.go deleted file mode 100644 index 05ee826bfda56..0000000000000 --- a/receiver/skywalkingreceiver/tracing_report_service.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" - -import ( - "context" - "errors" - "fmt" - "io" - - "go.opentelemetry.io/collector/consumer" - "google.golang.org/protobuf/proto" - common "skywalking.apache.org/repo/goapi/collect/common/v3" - agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" -) - -type traceSegmentReportService struct { - sr *swReceiver - agent.UnimplementedTraceSegmentReportServiceServer -} - -func (s *traceSegmentReportService) Collect(stream agent.TraceSegmentReportService_CollectServer) error { - for { - segmentObject, err := stream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - return stream.SendAndClose(&common.Commands{}) - } - return err - } - - err = consumeTraces(stream.Context(), segmentObject, s.sr.nextConsumer) - if err != nil { - return stream.SendAndClose(&common.Commands{}) - } - } -} - -func (s *traceSegmentReportService) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) { - for _, segment := range segments.Segments { - marshaledSegment, err := proto.Marshal(segment) - if err != nil { - fmt.Printf("cannot marshal segemnt from sync, %v", err) - } - err = consumeTraces(ctx, segment, s.sr.nextConsumer) - if err != nil { - fmt.Printf("cannot consume traces, %v", err) - } - fmt.Printf("receivec data:%s", marshaledSegment) - } - return &common.Commands{}, nil -} - -func consumeTraces(ctx context.Context, segment *agent.SegmentObject, consumer consumer.Traces) error { - if segment == nil { - return nil - } - ptd := SkywalkingToTraces(segment) - return consumer.ConsumeTraces(ctx, ptd) -} From 9226821f0b5b2038d4e8d0eef0816e79488af1bc Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Mon, 27 Mar 2023 23:03:06 +0800 Subject: [PATCH 02/14] add CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55a0b13ff78f1..955ae1f695e59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ - `splunkhecreceiver`: Appends query param (index, source, sourcetype, and host) for raw path (#19632) - `splunkhecreceiver`: align error message with splunk enterprise to include No Data, Invalid Data Format, Event field is required, and Event field cannot be blank (#19219) - `reciver/statsdreceiver`: Metrics emitted by the statsd receiver are batched by source IP address, available in context. (#15290) +- `receiver/skywalking`: Refactor code structure/directory in order to add metrics receiver. (#20344) ### 🧰 Bug fixes 🧰 From c08500005cb6e317358cda655d43a00654f81814 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Mon, 27 Mar 2023 23:30:09 +0800 Subject: [PATCH 03/14] remove local config.yaml --- cmd/otelcontribcol/config.yaml | 31 ------------------------------- 1 file changed, 31 deletions(-) delete mode 100644 cmd/otelcontribcol/config.yaml diff --git a/cmd/otelcontribcol/config.yaml b/cmd/otelcontribcol/config.yaml deleted file mode 100644 index e966e3bac98a6..0000000000000 --- a/cmd/otelcontribcol/config.yaml +++ /dev/null @@ -1,31 +0,0 @@ -extensions: -receivers: -# otlp: -# protocols: -# grpc: -# http: - skywalking: - protocols: - grpc: - endpoint: 0.0.0.0:11800 - http: - endpoint: 0.0.0.0:12800 -exporters: - logging: - loglevel: -1 -service: - - pipelines: - - traces: - receivers: [skywalking] - processors: - exporters: [logging] -# metrics: -# receivers: [skywalking] -# processors: -# exporters: [logging] - - - - extensions: [] \ No newline at end of file From 034a5f0dea64d0cc443e18bcd0493bd06770be66 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Tue, 28 Mar 2023 10:47:14 +0800 Subject: [PATCH 04/14] remove CHANGELOG --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 955ae1f695e59..55a0b13ff78f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,7 +58,6 @@ - `splunkhecreceiver`: Appends query param (index, source, sourcetype, and host) for raw path (#19632) - `splunkhecreceiver`: align error message with splunk enterprise to include No Data, Invalid Data Format, Event field is required, and Event field cannot be blank (#19219) - `reciver/statsdreceiver`: Metrics emitted by the statsd receiver are batched by source IP address, available in context. (#15290) -- `receiver/skywalking`: Refactor code structure/directory in order to add metrics receiver. (#20344) ### 🧰 Bug fixes 🧰 From 735b9216711c69b5c29bb0e7ab7ad01c8df848e2 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Tue, 28 Mar 2023 13:19:29 +0800 Subject: [PATCH 05/14] fix lint --- receiver/skywalkingreceiver/factory.go | 4 ++-- receiver/skywalkingreceiver/factory_test.go | 6 +++--- .../internal/trace/skywalkingproto_to_traces_test.go | 3 +-- receiver/skywalkingreceiver/skywalking_receiver.go | 4 ++-- receiver/skywalkingreceiver/skywalking_receiver_test.go | 3 +-- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/receiver/skywalkingreceiver/factory.go b/receiver/skywalkingreceiver/factory.go index 4e32e6ef47eca..b118da105dde9 100644 --- a/receiver/skywalkingreceiver/factory.go +++ b/receiver/skywalkingreceiver/factory.go @@ -22,14 +22,14 @@ import ( "net" "strconv" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" ) const ( diff --git a/receiver/skywalkingreceiver/factory_test.go b/receiver/skywalkingreceiver/factory_test.go index 173dc448939d9..be57dadf4d338 100644 --- a/receiver/skywalkingreceiver/factory_test.go +++ b/receiver/skywalkingreceiver/factory_test.go @@ -19,9 +19,6 @@ import ( "path/filepath" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" - "go.opentelemetry.io/collector/consumer/consumertest" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -31,7 +28,10 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" ) func TestTypeStr(t *testing.T) { diff --git a/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go index bc60b63da0641..76b935d9d0c2c 100644 --- a/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go +++ b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go @@ -19,11 +19,10 @@ import ( "testing" "time" - common "skywalking.apache.org/repo/goapi/collect/common/v3" - "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + common "skywalking.apache.org/repo/goapi/collect/common/v3" agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) diff --git a/receiver/skywalkingreceiver/skywalking_receiver.go b/receiver/skywalkingreceiver/skywalking_receiver.go index feba434791f4b..043c7388a0e74 100644 --- a/receiver/skywalkingreceiver/skywalking_receiver.go +++ b/receiver/skywalkingreceiver/skywalking_receiver.go @@ -22,8 +22,6 @@ import ( "net/http" "sync" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" - "github.com/gorilla/mux" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -37,6 +35,8 @@ import ( v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" profile "skywalking.apache.org/repo/goapi/collect/language/profile/v3" management "skywalking.apache.org/repo/goapi/collect/management/v3" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" ) // configuration defines the behavior and the ports that diff --git a/receiver/skywalkingreceiver/skywalking_receiver_test.go b/receiver/skywalkingreceiver/skywalking_receiver_test.go index cfdd30c23a2e1..1bafff12dab8f 100644 --- a/receiver/skywalkingreceiver/skywalking_receiver_test.go +++ b/receiver/skywalkingreceiver/skywalking_receiver_test.go @@ -24,12 +24,11 @@ import ( "testing" "time" - "go.opentelemetry.io/collector/config/confighttp" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/grpc" From 489ae913180cbd1d3679d1cf504109ca3a462c55 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Thu, 30 Mar 2023 23:47:32 +0800 Subject: [PATCH 06/14] rename func --- .../internal/trace/tracing_report_service.go | 4 ++-- receiver/skywalkingreceiver/skywalking_receiver.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go index 66ec2f35b7037..77eca4a46263b 100644 --- a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go +++ b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go @@ -44,8 +44,8 @@ type Receiver struct { agent.UnimplementedTraceSegmentReportServiceServer } -// New creates a new Receiver reference. -func New(nextConsumer consumer.Traces, set receiver.CreateSettings) (*Receiver, error) { +// NewReceiver creates a new Receiver reference. +func NewReceiver(nextConsumer consumer.Traces, set receiver.CreateSettings) (*Receiver, error) { grpcObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ ReceiverID: set.ID, Transport: grpcTransport, diff --git a/receiver/skywalkingreceiver/skywalking_receiver.go b/receiver/skywalkingreceiver/skywalking_receiver.go index 043c7388a0e74..98ad31fea84f2 100644 --- a/receiver/skywalkingreceiver/skywalking_receiver.go +++ b/receiver/skywalkingreceiver/skywalking_receiver.go @@ -82,7 +82,7 @@ func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error { return component.ErrNilNextConsumer } var err error - sr.traceReceiver, err = trace.New(tc, sr.settings) + sr.traceReceiver, err = trace.NewReceiver(tc, sr.settings) if err != nil { return err } From f989dfbe851516d2dafa54dd6ddd0b28970b58f8 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Fri, 31 Mar 2023 10:40:47 +0800 Subject: [PATCH 07/14] run make goproto and fix --- .../internal/trace/skywalkingproto_to_traces.go | 2 +- .../skywalkingreceiver/internal/trace/tracing_report_service.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go index ab1c7a7fda4ca..1651f44c9b6af 100644 --- a/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go +++ b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package trace +package trace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" import ( "bytes" diff --git a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go index 77eca4a46263b..546763d377019 100644 --- a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go +++ b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package trace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" +package trace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" import ( "context" From 3fea1c532e0e8a5bd368483c604af9c6e57b5958 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Sat, 1 Apr 2023 11:36:48 +0800 Subject: [PATCH 08/14] fix http reception unit test --- .../skywalking_receiver_test.go | 55 ++++++++++++++----- 1 file changed, 42 insertions(+), 13 deletions(-) diff --git a/receiver/skywalkingreceiver/skywalking_receiver_test.go b/receiver/skywalkingreceiver/skywalking_receiver_test.go index 1bafff12dab8f..4eaf4e6c52288 100644 --- a/receiver/skywalkingreceiver/skywalking_receiver_test.go +++ b/receiver/skywalkingreceiver/skywalking_receiver_test.go @@ -17,7 +17,6 @@ package skywalkingreceiver import ( "bytes" "context" - "encoding/json" "fmt" "net/http" "strconv" @@ -41,6 +40,43 @@ var ( skywalkingReceiver = component.NewIDWithName("skywalking", "receiver_test") ) +var traceJSON = []byte(` + [{ + "traceId": "a12ff60b-5807-463b-a1f8-fb1c8608219e", + "serviceInstance": "User_Service_Instance_Name", + "spans": [{ + "operationName": "/ingress", + "startTime": 1588664577013, + "endTime": 1588664577028, + "spanType": 0, + "spanId": 1, + "isError": false, + "parentSpanId": 0, + "componentId": 6000, + "peer": "upstream service", + "spanLayer": 3 + }, { + "operationName": "/ingress", + "startTime": 1588664577013, + "tags": [{ + "key": "http.method", + "value": "GET" + }, { + "key": "http.params", + "value": "http://localhost/ingress" + }], + "endTime": 1588664577028, + "spanType": 1, + "spanId": 0, + "parentSpanId": -1, + "isError": false, + "spanLayer": 3, + "componentId": 6000 + }], + "service": "User_Service_Name", + "traceSegmentId": "a12ff60b-5807-463b-a1f8-fb1c8608219e" +}]`) + func TestTraceSource(t *testing.T) { set := receivertest.NewNopCreateSettings() set.ID = skywalkingReceiver @@ -108,7 +144,10 @@ func TestGRPCReception(t *testing.T) { func TestHttpReception(t *testing.T) { config := &configuration{ - CollectorGRPCPort: 12800, // that's the only one used by this test + CollectorHTTPPort: 12800, + CollectorHTTPSettings: confighttp.HTTPServerSettings{ + Endpoint: fmt.Sprintf(":%d", 12800), + }, } sink := new(consumertest.TracesSink) @@ -119,12 +158,8 @@ func TestHttpReception(t *testing.T) { err := mockSwReceiver.registerTraceConsumer(sink) require.NoError(t, err) require.NoError(t, mockSwReceiver.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { require.NoError(t, mockSwReceiver.Shutdown(context.Background())) }) - - rb, err := mockSkywalkingHTTPTraceSegment() - require.NoError(t, err) - req, err := http.NewRequest("POST", "http://0.0.0.0:12800/v3/segments", bytes.NewBuffer(rb)) + req, err := http.NewRequest("POST", "http://127.0.0.1:12800/v3/segments", bytes.NewBuffer(traceJSON)) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") client := &http.Client{} @@ -223,9 +258,3 @@ func MockGrpcTraceSegment(sequence int) *agent.SegmentObject { }, } } - -func mockSkywalkingHTTPTraceSegment() ([]byte, error) { - segment := MockGrpcTraceSegment(1) - tb, err := json.Marshal(segment) - return tb, err -} From 7caa46f0a2cf5c1dfcae952cb089a805a0e5755a Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Sat, 1 Apr 2023 11:47:27 +0800 Subject: [PATCH 09/14] add change log --- .chloggen/skywalking_add_metric.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100755 .chloggen/skywalking_add_metric.yaml diff --git a/.chloggen/skywalking_add_metric.yaml b/.chloggen/skywalking_add_metric.yaml new file mode 100755 index 0000000000000..1d0075083462d --- /dev/null +++ b/.chloggen/skywalking_add_metric.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: skywalkingreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Refactoring the code structure/directory for the following metrics receiver implementation + +# One or more tracking issues related to the change +issues: [20315] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 2ca357eedcf0b655004f88483d66f5d39a453d15 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Thu, 13 Apr 2023 21:18:27 +0800 Subject: [PATCH 10/14] mod tidy --- receiver/skywalkingreceiver/go.sum | 1 - 1 file changed, 1 deletion(-) diff --git a/receiver/skywalkingreceiver/go.sum b/receiver/skywalkingreceiver/go.sum index 428dab116d571..6e455cff2525f 100644 --- a/receiver/skywalkingreceiver/go.sum +++ b/receiver/skywalkingreceiver/go.sum @@ -433,7 +433,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= From 3813f4ab3cdb85624757d2ac0e6aada1327a4122 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Thu, 4 May 2023 17:46:56 +0800 Subject: [PATCH 11/14] mod tidy --- receiver/skywalkingreceiver/go.sum | 3 +- .../tracing_report_service.go | 72 ------------------- 2 files changed, 2 insertions(+), 73 deletions(-) delete mode 100644 receiver/skywalkingreceiver/tracing_report_service.go diff --git a/receiver/skywalkingreceiver/go.sum b/receiver/skywalkingreceiver/go.sum index e95873c509b1b..c8a1d4b3b055e 100644 --- a/receiver/skywalkingreceiver/go.sum +++ b/receiver/skywalkingreceiver/go.sum @@ -68,6 +68,7 @@ github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -243,7 +244,6 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= -github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -434,6 +434,7 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/receiver/skywalkingreceiver/tracing_report_service.go b/receiver/skywalkingreceiver/tracing_report_service.go deleted file mode 100644 index f7a2d76679969..0000000000000 --- a/receiver/skywalkingreceiver/tracing_report_service.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" - -import ( - "context" - "errors" - "fmt" - "io" - - "go.opentelemetry.io/collector/consumer" - "google.golang.org/protobuf/proto" - common "skywalking.apache.org/repo/goapi/collect/common/v3" - agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" -) - -type traceSegmentReportService struct { - sr *swReceiver - agent.UnimplementedTraceSegmentReportServiceServer -} - -func (s *traceSegmentReportService) Collect(stream agent.TraceSegmentReportService_CollectServer) error { - for { - segmentObject, err := stream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - return stream.SendAndClose(&common.Commands{}) - } - return err - } - - err = consumeTraces(stream.Context(), segmentObject, s.sr.nextConsumer) - if err != nil { - return stream.SendAndClose(&common.Commands{}) - } - } -} - -func (s *traceSegmentReportService) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) { - for _, segment := range segments.Segments { - marshaledSegment, err := proto.Marshal(segment) - if err != nil { - fmt.Printf("cannot marshal segemnt from sync, %v", err) - } - err = consumeTraces(ctx, segment, s.sr.nextConsumer) - if err != nil { - fmt.Printf("cannot consume traces, %v", err) - } - fmt.Printf("receivec data:%s", marshaledSegment) - } - return &common.Commands{}, nil -} - -func consumeTraces(ctx context.Context, segment *agent.SegmentObject, consumer consumer.Traces) error { - if segment == nil { - return nil - } - ptd := SkywalkingToTraces(segment) - return consumer.ConsumeTraces(ctx, ptd) -} From 804434919c139e59f5caa96cfec5b669b63d84d9 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Thu, 11 May 2023 10:24:16 +0800 Subject: [PATCH 12/14] fix lint --- .../internal/trace/tracing_report_service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go index 546763d377019..29a7276ead773 100644 --- a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go +++ b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go @@ -1,10 +1,10 @@ -// Copyright OpenTelemetry Authors +// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, From 3bdc5ed003625b08bf5049047f95cf57f94681d5 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Mon, 15 May 2023 11:05:37 +0800 Subject: [PATCH 13/14] resolve conflicts --- receiver/skywalkingreceiver/factory.go | 3 +-- receiver/skywalkingreceiver/factory_test.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/receiver/skywalkingreceiver/factory.go b/receiver/skywalkingreceiver/factory.go index 89be29cf60744..859e1cde5be7b 100644 --- a/receiver/skywalkingreceiver/factory.go +++ b/receiver/skywalkingreceiver/factory.go @@ -29,9 +29,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata" ) const ( diff --git a/receiver/skywalkingreceiver/factory_test.go b/receiver/skywalkingreceiver/factory_test.go index eaf4576b1e5a2..39900ceb89c73 100644 --- a/receiver/skywalkingreceiver/factory_test.go +++ b/receiver/skywalkingreceiver/factory_test.go @@ -31,9 +31,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata" ) func TestTypeStr(t *testing.T) { From 486802e64d81bdd5d5b98505d500ecd5c898b629 Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Thu, 18 May 2023 10:37:17 +0800 Subject: [PATCH 14/14] resolve conflicts --- .../internal/trace/tracing_report_service.go | 13 +--- .../tracing_report_service.go | 61 ------------------- 2 files changed, 1 insertion(+), 73 deletions(-) delete mode 100644 receiver/skywalkingreceiver/tracing_report_service.go diff --git a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go index 29a7276ead773..343f20db17a10 100644 --- a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go +++ b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package trace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" diff --git a/receiver/skywalkingreceiver/tracing_report_service.go b/receiver/skywalkingreceiver/tracing_report_service.go deleted file mode 100644 index eec5b13284af6..0000000000000 --- a/receiver/skywalkingreceiver/tracing_report_service.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" - -import ( - "context" - "errors" - "fmt" - "io" - - "go.opentelemetry.io/collector/consumer" - "google.golang.org/protobuf/proto" - common "skywalking.apache.org/repo/goapi/collect/common/v3" - agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" -) - -type traceSegmentReportService struct { - sr *swReceiver - agent.UnimplementedTraceSegmentReportServiceServer -} - -func (s *traceSegmentReportService) Collect(stream agent.TraceSegmentReportService_CollectServer) error { - for { - segmentObject, err := stream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - return stream.SendAndClose(&common.Commands{}) - } - return err - } - - err = consumeTraces(stream.Context(), segmentObject, s.sr.nextConsumer) - if err != nil { - return stream.SendAndClose(&common.Commands{}) - } - } -} - -func (s *traceSegmentReportService) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) { - for _, segment := range segments.Segments { - marshaledSegment, err := proto.Marshal(segment) - if err != nil { - fmt.Printf("cannot marshal segemnt from sync, %v", err) - } - err = consumeTraces(ctx, segment, s.sr.nextConsumer) - if err != nil { - fmt.Printf("cannot consume traces, %v", err) - } - fmt.Printf("receivec data:%s", marshaledSegment) - } - return &common.Commands{}, nil -} - -func consumeTraces(ctx context.Context, segment *agent.SegmentObject, consumer consumer.Traces) error { - if segment == nil { - return nil - } - ptd := SkywalkingToTraces(segment) - return consumer.ConsumeTraces(ctx, ptd) -}